source: branches/supervisor/src/main/java/omq/server/InvocationThread.java @ 102

Last change on this file since 102 was 102, checked in by stoda, 11 years ago

All tests working

TODO sometimes exceptiontest fails

File size: 7.8 KB
Line 
1package omq.server;
2
3import java.io.IOException;
4import java.lang.reflect.InvocationTargetException;
5import java.util.Properties;
6
7import omq.common.broker.Broker;
8import omq.common.message.Request;
9import omq.common.message.Response;
10import omq.common.util.ParameterQueue;
11import omq.common.util.Serializer;
12import omq.exception.OmqException;
13import omq.exception.SerializerException;
14
15import org.apache.log4j.Logger;
16
17import com.rabbitmq.client.AMQP.BasicProperties;
18import com.rabbitmq.client.Channel;
19import com.rabbitmq.client.ConsumerCancelledException;
20import com.rabbitmq.client.QueueingConsumer;
21import com.rabbitmq.client.QueueingConsumer.Delivery;
22import com.rabbitmq.client.ShutdownSignalException;
23
24/**
25 * An invocationThread waits for requests an invokes them.
26 *
27 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
28 *
29 */
30public class InvocationThread extends Thread {
31
32        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
33        private static final String multi = "multi#";
34
35        // RemoteObject
36        private RemoteObject obj;
37        private String UID;
38        private Properties env;
39        private boolean idle;
40        private long lastExec;
41
42        private RemoteThreadPool pool;
43
44        // Broker
45        private Broker broker;
46        private Serializer serializer;
47
48        // Consumer
49        private Channel channel;
50        private QueueingConsumer consumer;
51        private String multiQueue;
52        private boolean killed = false;
53
54        public InvocationThread(RemoteObject obj) throws Exception {
55                this.obj = obj;
56                this.UID = obj.getRef();
57                this.env = obj.getEnv();
58                this.broker = obj.getBroker();
59                this.pool = obj.getPool();
60                this.serializer = broker.getSerializer();
61                this.lastExec = 0;
62                this.idle = true;
63        }
64
65        @Override
66        public synchronized void start() {
67                try {
68                        startQueues();
69                        super.start();
70                } catch (Exception e) {
71                        logger.error("Cannot start a remoteObject", e);
72                }
73
74        }
75
76        @Override
77        public void run() {
78                while (!killed) {
79                        try {
80                                // Get the delivery
81                                Delivery delivery = consumer.nextDelivery();
82
83                                // This thread gets busy
84                                pool.getBusy().incrementAndGet();
85                                idle = false;
86
87                                String serializerType = delivery.getProperties().getType();
88
89                                // Deserialize the json
90                                Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
91                                String methodName = request.getMethod();
92                                String requestID = request.getId();
93
94                                logger.debug("Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + ", serializerType: " + serializerType);
95
96                                // Invoke the method
97                                Object result = null;
98                                OmqException error = null;
99                                try {
100                                        result = obj.invokeMethod(request.getMethod(), request.getParams());
101                                } catch (InvocationTargetException e) {
102                                        Throwable throwable = e.getTargetException();
103                                        logger.error("Object: " + obj.getRef() + " at method: " + methodName + ", corrID" + requestID, throwable);
104                                        error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage());
105                                } catch (NoSuchMethodException e) {
106                                        logger.error("Object: " + obj.getRef() + " cannot find method: " + methodName);
107                                        error = new OmqException(e.getClass().getCanonicalName(), e.getMessage());
108                                }
109
110                                // Reply if it's necessary
111                                if (!request.isAsync()) {
112                                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
113
114                                        BasicProperties props = delivery.getProperties();
115
116                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
117
118                                        byte[] bytesResponse = serializer.serialize(serializerType, resp);
119                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
120                                        logger.debug("Publish sync response -> Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + " replyTo: "
121                                                        + props.getReplyTo());
122                                }
123
124                                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
125
126                                // The thread is now idle
127                                lastExec = System.currentTimeMillis();
128                                idle = true;
129                                pool.getBusy().decrementAndGet();
130
131                        } catch (InterruptedException i) {
132                                logger.error(i);
133                        } catch (ShutdownSignalException e) {
134                                logger.error(e);
135                                try {
136                                        if (channel.isOpen()) {
137                                                channel.close();
138                                        }
139                                        startQueues();
140                                } catch (Exception e1) {
141                                        try {
142                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
143                                                Thread.sleep(milis);
144                                        } catch (InterruptedException e2) {
145                                                logger.error(e2);
146                                        }
147                                        logger.error(e1);
148                                }
149                        } catch (ConsumerCancelledException e) {
150                                logger.error(e);
151                        } catch (SerializerException e) {
152                                logger.error(e);
153                        } catch (Exception e) {
154                                e.printStackTrace();
155                                logger.error(e);
156                        }
157
158                }
159                logger.info("ObjectMQ ('" + obj.getRef() + "') InvocationThread " + Thread.currentThread().getId() + " is killed");
160        }
161
162        /**
163         * This method starts the queues using the information got in the
164         * environment.
165         *
166         * @throws Exception
167         */
168        private void startQueues() throws Exception {
169                // Start channel
170                channel = broker.getNewChannel();
171
172                /*
173                 * Default queue, Round Robin behaviour
174                 */
175
176                // Get info about which exchange and queue will use
177                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
178                String queue = UID;
179                String routingKey = UID;
180
181                // RemoteObject default queue
182                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
183                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
184                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
185
186                // Declares and bindings
187                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
188                        channel.exchangeDeclare(exchange, "direct");
189                }
190                channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
191                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
192                        channel.queueBind(queue, exchange, routingKey);
193                }
194                logger.info("RemoteObject: " + UID + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
195                                + exclusive + ", AutoDelete: " + autoDelete);
196
197                /*
198                 * Multi queue, exclusive per each instance
199                 */
200
201                // Get info about the multiQueue
202                String multiExchange = multi + UID;
203                // TODO:String multiExchange = multi + exchange + UID;
204                multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
205
206                // Multi queue (exclusive queue per remoteObject)
207                boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
208                boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
209                boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
210
211                // Declares and bindings
212                channel.exchangeDeclare(multiExchange, "fanout");
213                if (multiQueue == null) {
214                        multiQueue = channel.queueDeclare().getQueue();
215                } else {
216                        channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
217                }
218                channel.queueBind(multiQueue, multiExchange, "");
219                logger.info("RemoteObject: " + UID + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable
220                                + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
221
222                /*
223                 * Consumer
224                 */
225
226                boolean autoAck = false;
227
228                int prefetchCount = 1;
229                channel.basicQos(prefetchCount);
230
231                // Declare a new consumer
232                consumer = new QueueingConsumer(channel);
233                channel.basicConsume(queue, autoAck, consumer);
234                channel.basicConsume(multiQueue, autoAck, consumer);
235        }
236
237        public void kill() throws IOException {
238                logger.info("Killing objectmq: " + UID + " thread id");
239                killed = true;
240                interrupt();
241                channel.close();
242        }
243
244        public RemoteObject getObj() {
245                return obj;
246        }
247
248        public void setObj(RemoteObject obj) {
249                this.obj = obj;
250        }
251
252        public long getLastExecution() {
253                return lastExec;
254        }
255
256        public boolean isIdle() {
257                return idle;
258        }
259
260}
Note: See TracBrowser for help on using the repository browser.