source: branches/supervisor/src/main/java/omq/server/InvocationThread.java @ 100

Last change on this file since 100 was 100, checked in by stoda, 11 years ago

Idea of thread supervisor done

TODO test the idea, change the remote properties, etc

File size: 7.7 KB
Line 
1package omq.server;
2
3import java.io.IOException;
4import java.lang.reflect.InvocationTargetException;
5import java.util.Properties;
6
7import omq.common.broker.Broker;
8import omq.common.message.Request;
9import omq.common.message.Response;
10import omq.common.util.ParameterQueue;
11import omq.common.util.Serializer;
12import omq.exception.OmqException;
13import omq.exception.SerializerException;
14
15import org.apache.log4j.Logger;
16
17import com.rabbitmq.client.AMQP.BasicProperties;
18import com.rabbitmq.client.Channel;
19import com.rabbitmq.client.ConsumerCancelledException;
20import com.rabbitmq.client.QueueingConsumer;
21import com.rabbitmq.client.QueueingConsumer.Delivery;
22import com.rabbitmq.client.ShutdownSignalException;
23
24/**
25 * An invocationThread waits for requests an invokes them.
26 *
27 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
28 *
29 */
30public class InvocationThread extends Thread {
31
32        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
33        private static final String multi = "multi#";
34
35        // RemoteObject
36        private RemoteObject obj;
37        private String UID;
38        private Properties env;
39        private boolean idle;
40        private long lastExec;
41
42        private RemoteThreadPool pool; // TODO posar això bé
43
44        // Broker
45        private Broker broker;
46        private Serializer serializer;
47
48        // Consumer
49        private Channel channel;
50        private QueueingConsumer consumer;
51        private String multiQueue;
52        private boolean killed = false;
53
54        public InvocationThread(RemoteObject obj, Broker broker) throws Exception {
55                this.obj = obj;
56                this.UID = obj.getRef();
57                this.env = obj.getEnv();
58                this.broker = broker;
59                this.serializer = broker.getSerializer();
60                this.lastExec = 0;
61                this.idle = true;
62        }
63
64        @Override
65        public synchronized void start() {
66                try {
67                        startQueues();
68                        super.start();
69                } catch (Exception e) {
70                        logger.error("Cannot start a remoteObject", e);
71                }
72
73        }
74
75        @Override
76        public void run() {
77                while (!killed) {
78                        try {
79                                // Get the delivery
80                                Delivery delivery = consumer.nextDelivery();
81
82                                // This thread gets busy
83                                pool.getBusy().incrementAndGet();
84                                idle = false;
85
86                                String serializerType = delivery.getProperties().getType();
87
88                                // Deserialize the json
89                                Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
90                                String methodName = request.getMethod();
91                                String requestID = request.getId();
92
93                                logger.debug("Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + ", serializerType: " + serializerType);
94
95                                // Invoke the method
96                                Object result = null;
97                                OmqException error = null;
98                                try {
99                                        result = obj.invokeMethod(request.getMethod(), request.getParams());
100                                } catch (InvocationTargetException e) {
101                                        Throwable throwable = e.getTargetException();
102                                        logger.error("Object: " + obj.getRef() + " at method: " + methodName + ", corrID" + requestID, throwable);
103                                        error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage());
104                                } catch (NoSuchMethodException e) {
105                                        logger.error("Object: " + obj.getRef() + " cannot find method: " + methodName);
106                                        error = new OmqException(e.getClass().getCanonicalName(), e.getMessage());
107                                }
108
109                                // Reply if it's necessary
110                                if (!request.isAsync()) {
111                                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
112
113                                        BasicProperties props = delivery.getProperties();
114
115                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
116
117                                        byte[] bytesResponse = serializer.serialize(serializerType, resp);
118                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
119                                        logger.debug("Publish sync response -> Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + " replyTo: "
120                                                        + props.getReplyTo());
121                                }
122
123                                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
124
125                                // The thread is now idle
126                                lastExec = System.currentTimeMillis();
127                                idle = true;
128                                pool.getBusy().decrementAndGet();
129
130                        } catch (InterruptedException i) {
131                                logger.error(i);
132                        } catch (ShutdownSignalException e) {
133                                logger.error(e);
134                                try {
135                                        if (channel.isOpen()) {
136                                                channel.close();
137                                        }
138                                        startQueues();
139                                } catch (Exception e1) {
140                                        try {
141                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
142                                                Thread.sleep(milis);
143                                        } catch (InterruptedException e2) {
144                                                logger.error(e2);
145                                        }
146                                        logger.error(e1);
147                                }
148                        } catch (ConsumerCancelledException e) {
149                                logger.error(e);
150                        } catch (SerializerException e) {
151                                logger.error(e);
152                        } catch (Exception e) {
153                                logger.error(e);
154                        }
155
156                }
157        }
158
159        /**
160         * This method starts the queues using the information got in the
161         * environment.
162         *
163         * @throws Exception
164         */
165        private void startQueues() throws Exception {
166                // Start channel
167                channel = broker.getNewChannel();
168
169                /*
170                 * Default queue, Round Robin behaviour
171                 */
172
173                // Get info about which exchange and queue will use
174                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
175                String queue = UID;
176                String routingKey = UID;
177
178                // RemoteObject default queue
179                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
180                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
181                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
182
183                // Declares and bindings
184                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
185                        channel.exchangeDeclare(exchange, "direct");
186                }
187                channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
188                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
189                        channel.queueBind(queue, exchange, routingKey);
190                }
191                logger.info("RemoteObject: " + UID + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
192                                + exclusive + ", AutoDelete: " + autoDelete);
193
194                /*
195                 * Multi queue, exclusive per each instance
196                 */
197
198                // Get info about the multiQueue
199                String multiExchange = multi + UID;
200                // TODO:String multiExchange = multi + exchange + UID;
201                multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
202
203                // Multi queue (exclusive queue per remoteObject)
204                boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
205                boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
206                boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
207
208                // Declares and bindings
209                channel.exchangeDeclare(multiExchange, "fanout");
210                if (multiQueue == null) {
211                        multiQueue = channel.queueDeclare().getQueue();
212                } else {
213                        channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
214                }
215                channel.queueBind(multiQueue, multiExchange, "");
216                logger.info("RemoteObject: " + UID + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable
217                                + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
218
219                /*
220                 * Consumer
221                 */
222
223                boolean autoAck = false;
224
225                int prefetchCount = 1;
226                channel.basicQos(prefetchCount);
227
228                // Declare a new consumer
229                consumer = new QueueingConsumer(channel);
230                channel.basicConsume(queue, autoAck, consumer);
231                channel.basicConsume(multiQueue, autoAck, consumer);
232        }
233
234        public void kill() throws IOException {
235                logger.info("Killing objectmq: " + UID + " thread id");
236                killed = true;
237                interrupt();
238                channel.close();
239        }
240
241        public RemoteObject getObj() {
242                return obj;
243        }
244
245        public void setObj(RemoteObject obj) {
246                this.obj = obj;
247        }
248
249        public long getLastExecution() {
250                return lastExec;
251        }
252
253        public boolean isIdle() {
254                return idle;
255        }
256
257}
Note: See TracBrowser for help on using the repository browser.