source: branches/supervisor/src/main/java/omq/server/InvocationThread.java @ 107

Last change on this file since 107 was 107, checked in by stoda, 11 years ago

Error detected: there should be only one thread listening to the multiexchange queue.
TODO: change this. Make refactor in the invocationthread. Change remotethreadpool to achieve this behavior.
AInvocationThread <- InvocationTHread

<- MultiInvocationTHread

File size: 8.3 KB
Line 
1package omq.server;
2
3import java.io.IOException;
4import java.lang.reflect.InvocationTargetException;
5import java.util.Properties;
6
7import omq.common.broker.Broker;
8import omq.common.message.Request;
9import omq.common.message.Response;
10import omq.common.util.ParameterQueue;
11import omq.common.util.Serializer;
12import omq.exception.OmqException;
13import omq.exception.SerializerException;
14
15import org.apache.log4j.Logger;
16
17import com.rabbitmq.client.AMQP.BasicProperties;
18import com.rabbitmq.client.Channel;
19import com.rabbitmq.client.ConsumerCancelledException;
20import com.rabbitmq.client.QueueingConsumer;
21import com.rabbitmq.client.QueueingConsumer.Delivery;
22import com.rabbitmq.client.ShutdownSignalException;
23
24/**
25 * An invocationThread waits for requests an invokes them.
26 *
27 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
28 *
29 */
30public class InvocationThread extends Thread {
31
32        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
33        private static final String multi = "multi#";
34
35        // RemoteObject
36        private RemoteObject obj;
37        private String reference;
38        private String UID;
39        private Properties env;
40        private boolean idle;
41        private long lastExec;
42
43        private RemoteThreadPool pool;
44
45        // Broker
46        private Broker broker;
47        private Serializer serializer;
48
49        // Consumer
50        private Channel channel;
51        private QueueingConsumer consumer;
52        private String multiQueue;
53        private boolean killed = false;
54
55        public InvocationThread(RemoteObject obj) throws Exception {
56                this.obj = obj;
57                this.UID = obj.getUID();
58                this.reference = obj.getRef();
59                this.env = obj.getEnv();
60                this.broker = obj.getBroker();
61                this.pool = obj.getPool();
62                this.serializer = broker.getSerializer();
63                this.lastExec = 0;
64                this.idle = true;
65        }
66
67        @Override
68        public synchronized void start() {
69                try {
70                        startQueues();
71                        super.start();
72                } catch (Exception e) {
73                        logger.error("Cannot start a remoteObject", e);
74                }
75
76        }
77
78        @Override
79        public void run() {
80                while (!killed) {
81                        try {
82                                // Get the delivery
83                                Delivery delivery = consumer.nextDelivery();
84
85                                // This thread gets busy
86                                pool.getBusy().incrementAndGet();
87                                idle = false;
88
89                                String serializerType = delivery.getProperties().getType();
90
91                                // Deserialize the request
92                                Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
93                                String methodName = request.getMethod();
94                                String requestID = request.getId();
95
96                                logger.debug("Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + ", serializerType: " + serializerType);
97
98                                // Invoke the method
99                                Object result = null;
100                                OmqException error = null;
101                                try {
102                                        result = obj.invokeMethod(request.getMethod(), request.getParams());
103                                } catch (InvocationTargetException e) {
104                                        Throwable throwable = e.getTargetException();
105                                        logger.error("Object: " + obj.getRef() + " at method: " + methodName + ", corrID" + requestID, throwable);
106                                        error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage());
107                                } catch (NoSuchMethodException e) {
108                                        logger.error("Object: " + obj.getRef() + " cannot find method: " + methodName);
109                                        error = new OmqException(e.getClass().getCanonicalName(), e.getMessage());
110                                }
111
112                                // Reply if it's necessary
113                                if (!request.isAsync()) {
114                                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
115
116                                        BasicProperties props = delivery.getProperties();
117
118                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
119
120                                        byte[] bytesResponse = serializer.serialize(serializerType, resp);
121                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
122                                        logger.debug("Publish sync response -> Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + " replyTo: "
123                                                        + props.getReplyTo());
124                                }
125
126                                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
127
128                                // The thread is now idle
129                                lastExec = System.currentTimeMillis();
130                                idle = true;
131                                pool.getBusy().decrementAndGet();
132
133                        } catch (InterruptedException i) {
134                                logger.error(i);
135                        } catch (ShutdownSignalException e) {
136                                logger.error(e);
137                                try {
138                                        if (channel.isOpen()) {
139                                                channel.close();
140                                        }
141                                        startQueues();
142                                } catch (Exception e1) {
143                                        try {
144                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
145                                                Thread.sleep(milis);
146                                        } catch (InterruptedException e2) {
147                                                logger.error(e2);
148                                        }
149                                        logger.error(e1);
150                                }
151                        } catch (ConsumerCancelledException e) {
152                                logger.error(e);
153                        } catch (SerializerException e) {
154                                logger.error(e);
155                        } catch (Exception e) {
156                                e.printStackTrace();
157                                logger.error(e);
158                        }
159
160                }
161                logger.info("ObjectMQ ('" + obj.getRef() + "') InvocationThread " + Thread.currentThread().getId() + " is killed");
162        }
163
164        /**
165         * This method starts the queues using the information got in the
166         * environment.
167         *
168         * @throws Exception
169         */
170        private void startQueues() throws Exception {
171                // Start channel
172                channel = broker.getNewChannel();
173
174                // Get info about which exchange and queue will use
175                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
176                String queue = reference;
177                String routingKey = reference;
178
179                // RemoteObject default queue
180                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
181                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
182                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
183
184                // Declares and bindings
185                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
186                        channel.exchangeDeclare(exchange, "direct");
187                }
188                channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
189                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
190                        channel.queueBind(queue, exchange, routingKey);
191                }
192                logger.info("RemoteObject: " + reference + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
193                                + exclusive + ", AutoDelete: " + autoDelete);
194
195                /*
196                 * UID queue
197                 */
198
199                if (UID != null) {
200
201                        boolean uidDurable = false;
202                        boolean uidExclusive = true;
203                        boolean uidAutoDelete = true;
204
205                        channel.queueDeclare(UID, uidDurable, uidExclusive, uidAutoDelete, null);
206                        if (!exchange.equalsIgnoreCase("")) { // Default exchange case
207                                channel.queueBind(UID, exchange, UID);
208                        }
209                }
210
211                /*
212                 * Multi queue, exclusive per each instance
213                 */
214
215                // Get info about the multiQueue
216                String multiExchange = multi + reference;
217                // TODO:String multiExchange = multi + exchange + reference;
218                multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
219
220                // Multi queue (exclusive queue per remoteObject)
221                boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
222                boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
223                boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
224
225                // Declares and bindings
226                channel.exchangeDeclare(multiExchange, "fanout");
227                if (multiQueue == null) {
228                        multiQueue = channel.queueDeclare().getQueue();
229                } else {
230                        channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
231                }
232                channel.queueBind(multiQueue, multiExchange, "");
233                logger.info("RemoteObject: " + reference + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable
234                                + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
235
236                /*
237                 * Consumer
238                 */
239
240                // Disable Round Robin behavior
241                boolean autoAck = false;
242
243                int prefetchCount = 1;
244                channel.basicQos(prefetchCount);
245
246                // Declare a new consumer
247                consumer = new QueueingConsumer(channel);
248                channel.basicConsume(queue, autoAck, consumer);
249                channel.basicConsume(multiQueue, autoAck, consumer);
250                if (UID != null) {
251                        channel.basicConsume(UID, autoAck, consumer);
252                }
253        }
254
255        public void kill() throws IOException {
256                logger.info("Killing objectmq: " + reference + " thread id");
257                killed = true;
258                interrupt();
259                channel.close();
260        }
261
262        public RemoteObject getObj() {
263                return obj;
264        }
265
266        public void setObj(RemoteObject obj) {
267                this.obj = obj;
268        }
269
270        public long getLastExecution() {
271                return lastExec;
272        }
273
274        public boolean isIdle() {
275                return idle;
276        }
277
278}
Note: See TracBrowser for help on using the repository browser.