source: branches/supervisor/src/main/java/omq/server/InvocationThread.java @ 99

Last change on this file since 99 was 96, checked in by stoda, 11 years ago

ObjectMQ without RemoteWrapper? - one consumer per thread -

File size: 7.2 KB
Line 
1package omq.server;
2
3import java.io.IOException;
4import java.lang.reflect.InvocationTargetException;
5import java.util.Properties;
6
7import omq.common.broker.Broker;
8import omq.common.message.Request;
9import omq.common.message.Response;
10import omq.common.util.ParameterQueue;
11import omq.common.util.Serializer;
12import omq.exception.OmqException;
13import omq.exception.SerializerException;
14
15import org.apache.log4j.Logger;
16
17import com.rabbitmq.client.AMQP.BasicProperties;
18import com.rabbitmq.client.Channel;
19import com.rabbitmq.client.ConsumerCancelledException;
20import com.rabbitmq.client.QueueingConsumer;
21import com.rabbitmq.client.QueueingConsumer.Delivery;
22import com.rabbitmq.client.ShutdownSignalException;
23
24/**
25 * An invocationThread waits for requests an invokes them.
26 *
27 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
28 *
29 */
30public class InvocationThread extends Thread {
31
32        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
33        private static final String multi = "multi#";
34
35        // RemoteObject
36        private RemoteObject obj;
37        private String UID;
38        private Properties env;
39
40        // Broker
41        private Broker broker;
42        private Serializer serializer;
43
44        // Consumer
45        private Channel channel;
46        private QueueingConsumer consumer;
47        private String multiQueue;
48        private boolean killed = false;
49
50        public InvocationThread(RemoteObject obj, Broker broker) throws Exception {
51                this.obj = obj;
52                this.UID = obj.getRef();
53                this.env = obj.getEnv();
54                this.broker = broker;
55                this.serializer = broker.getSerializer();
56        }
57
58        @Override
59        public synchronized void start() {
60                try {
61                        startQueues();
62                        super.start();
63                } catch (Exception e) {
64                        logger.error("Cannot start a remoteObject", e);
65                }
66
67        }
68
69        @Override
70        public void run() {
71                while (!killed) {
72                        try {
73                                // Get the delivery
74                                Delivery delivery = consumer.nextDelivery();
75
76                                String serializerType = delivery.getProperties().getType();
77
78                                // Deserialize the json
79                                Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
80                                String methodName = request.getMethod();
81                                String requestID = request.getId();
82
83                                logger.debug("Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + ", serializerType: " + serializerType);
84
85                                // Invoke the method
86                                Object result = null;
87                                OmqException error = null;
88                                try {
89                                        result = obj.invokeMethod(request.getMethod(), request.getParams());
90                                } catch (InvocationTargetException e) {
91                                        Throwable throwable = e.getTargetException();
92                                        logger.error("Object: " + obj.getRef() + " at method: " + methodName + ", corrID" + requestID, throwable);
93                                        error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage());
94                                } catch (NoSuchMethodException e) {
95                                        logger.error("Object: " + obj.getRef() + " cannot find method: " + methodName);
96                                        error = new OmqException(e.getClass().getCanonicalName(), e.getMessage());
97                                }
98
99                                // Reply if it's necessary
100                                if (!request.isAsync()) {
101                                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
102
103                                        BasicProperties props = delivery.getProperties();
104
105                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
106
107                                        byte[] bytesResponse = serializer.serialize(serializerType, resp);
108                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
109                                        logger.debug("Publish sync response -> Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + " replyTo: "
110                                                        + props.getReplyTo());
111                                }
112
113                                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
114                        } catch (InterruptedException i) {
115                                logger.error(i);
116                        } catch (ShutdownSignalException e) {
117                                logger.error(e);
118                                try {
119                                        if (channel.isOpen()) {
120                                                channel.close();
121                                        }
122                                        startQueues();
123                                } catch (Exception e1) {
124                                        try {
125                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
126                                                Thread.sleep(milis);
127                                        } catch (InterruptedException e2) {
128                                                logger.error(e2);
129                                        }
130                                        logger.error(e1);
131                                }
132                        } catch (ConsumerCancelledException e) {
133                                logger.error(e);
134                        } catch (SerializerException e) {
135                                logger.error(e);
136                        } catch (Exception e) {
137                                logger.error(e);
138                        }
139
140                }
141        }
142
143        /**
144         * This method starts the queues using the information got in the
145         * environment.
146         *
147         * @throws Exception
148         */
149        private void startQueues() throws Exception {
150                // Start channel
151                channel = broker.getNewChannel();
152
153                /*
154                 * Default queue, Round Robin behaviour
155                 */
156
157                // Get info about which exchange and queue will use
158                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
159                String queue = UID;
160                String routingKey = UID;
161
162                // RemoteObject default queue
163                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
164                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
165                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
166
167                // Declares and bindings
168                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
169                        channel.exchangeDeclare(exchange, "direct");
170                }
171                channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
172                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
173                        channel.queueBind(queue, exchange, routingKey);
174                }
175                logger.info("RemoteObject: " + UID + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
176                                + exclusive + ", AutoDelete: " + autoDelete);
177
178                /*
179                 * Multi queue, exclusive per each instance
180                 */
181
182                // Get info about the multiQueue
183                String multiExchange = multi + UID;
184                // TODO:String multiExchange = multi + exchange + UID;
185                multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
186
187                // Multi queue (exclusive queue per remoteObject)
188                boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
189                boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
190                boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
191
192                // Declares and bindings
193                channel.exchangeDeclare(multiExchange, "fanout");
194                if (multiQueue == null) {
195                        multiQueue = channel.queueDeclare().getQueue();
196                } else {
197                        channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
198                }
199                channel.queueBind(multiQueue, multiExchange, "");
200                logger.info("RemoteObject: " + UID + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable
201                                + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
202
203                /*
204                 * Consumer
205                 */
206
207                boolean autoAck = false;
208
209                int prefetchCount = 1;
210                channel.basicQos(prefetchCount);
211
212                // Declare a new consumer
213                consumer = new QueueingConsumer(channel);
214                channel.basicConsume(queue, autoAck, consumer);
215                channel.basicConsume(multiQueue, autoAck, consumer);
216        }
217
218        public void kill() throws IOException {
219                logger.info("Killing objectmq: " + UID + " thread id");
220                killed = true;
221                interrupt();
222                channel.close();
223        }
224
225        public RemoteObject getObj() {
226                return obj;
227        }
228
229        public void setObj(RemoteObject obj) {
230                this.obj = obj;
231        }
232
233}
Note: See TracBrowser for help on using the repository browser.