source: branches/supervisor/src/main/java/omq/server/InvocationThread.java @ 106

Last change on this file since 106 was 106, checked in by stoda, 11 years ago

abans que la segueixi liant...

File size: 8.3 KB
Line 
1package omq.server;
2
3import java.io.IOException;
4import java.lang.reflect.InvocationTargetException;
5import java.util.Properties;
6
7import omq.common.broker.Broker;
8import omq.common.message.Request;
9import omq.common.message.Response;
10import omq.common.util.ParameterQueue;
11import omq.common.util.Serializer;
12import omq.exception.OmqException;
13import omq.exception.SerializerException;
14
15import org.apache.log4j.Logger;
16
17import com.rabbitmq.client.AMQP.BasicProperties;
18import com.rabbitmq.client.Channel;
19import com.rabbitmq.client.ConsumerCancelledException;
20import com.rabbitmq.client.QueueingConsumer;
21import com.rabbitmq.client.QueueingConsumer.Delivery;
22import com.rabbitmq.client.ShutdownSignalException;
23
24/**
25 * An invocationThread waits for requests an invokes them.
26 *
27 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
28 *
29 */
30public class InvocationThread extends Thread {
31
32        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
33        private static final String multi = "multi#";
34
35        // RemoteObject
36        private RemoteObject obj;
37        private String reference;
38        private String UID;
39        private Properties env;
40        private boolean idle;
41        private long lastExec;
42
43        private RemoteThreadPool pool;
44
45        // Broker
46        private Broker broker;
47        private Serializer serializer;
48
49        // Consumer
50        private Channel channel;
51        private QueueingConsumer consumer;
52        private String multiQueue;
53        private boolean killed = false;
54
55        public InvocationThread(RemoteObject obj) throws Exception {
56                this.obj = obj;
57                this.UID = obj.getUID();
58                this.reference = obj.getRef();
59                this.env = obj.getEnv();
60                this.broker = obj.getBroker();
61                this.pool = obj.getPool();
62                this.serializer = broker.getSerializer();
63                this.lastExec = 0;
64                this.idle = true;
65        }
66
67        @Override
68        public synchronized void start() {
69                try {
70                        startQueues();
71                        super.start();
72                } catch (Exception e) {
73                        logger.error("Cannot start a remoteObject", e);
74                }
75
76        }
77
78        @Override
79        public void run() {
80                while (!killed) {
81                        try {
82                                // Get the delivery
83                                Delivery delivery = consumer.nextDelivery();
84
85                                // This thread gets busy
86                                pool.getBusy().incrementAndGet();
87                                idle = false;
88
89                                String serializerType = delivery.getProperties().getType();
90
91                                // Deserialize the request
92                                Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
93                                String methodName = request.getMethod();
94                                String requestID = request.getId();
95
96                                logger.debug("Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + ", serializerType: " + serializerType);
97
98                                // Invoke the method
99                                Object result = null;
100                                OmqException error = null;
101                                try {
102                                        result = obj.invokeMethod(request.getMethod(), request.getParams());
103                                } catch (InvocationTargetException e) {
104                                        Throwable throwable = e.getTargetException();
105                                        logger.error("Object: " + obj.getRef() + " at method: " + methodName + ", corrID" + requestID, throwable);
106                                        error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage());
107                                } catch (NoSuchMethodException e) {
108                                        logger.error("Object: " + obj.getRef() + " cannot find method: " + methodName);
109                                        error = new OmqException(e.getClass().getCanonicalName(), e.getMessage());
110                                }
111
112                                // Reply if it's necessary
113                                if (!request.isAsync()) {
114                                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
115
116                                        BasicProperties props = delivery.getProperties();
117
118                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
119
120                                        byte[] bytesResponse = serializer.serialize(serializerType, resp);
121                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
122                                        logger.debug("Publish sync response -> Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + " replyTo: "
123                                                        + props.getReplyTo());
124                                }
125
126                                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
127
128                                // The thread is now idle
129                                lastExec = System.currentTimeMillis();
130                                idle = true;
131                                pool.getBusy().decrementAndGet();
132
133                        } catch (InterruptedException i) {
134                                logger.error(i);
135                        } catch (ShutdownSignalException e) {
136                                logger.error(e);
137                                try {
138                                        if (channel.isOpen()) {
139                                                channel.close();
140                                        }
141                                        startQueues();
142                                } catch (Exception e1) {
143                                        try {
144                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
145                                                Thread.sleep(milis);
146                                        } catch (InterruptedException e2) {
147                                                logger.error(e2);
148                                        }
149                                        logger.error(e1);
150                                }
151                        } catch (ConsumerCancelledException e) {
152                                logger.error(e);
153                        } catch (SerializerException e) {
154                                logger.error(e);
155                        } catch (Exception e) {
156                                e.printStackTrace();
157                                logger.error(e);
158                        }
159
160                }
161                logger.info("ObjectMQ ('" + obj.getRef() + "') InvocationThread " + Thread.currentThread().getId() + " is killed");
162        }
163
164        /**
165         * This method starts the queues using the information got in the
166         * environment.
167         *
168         * @throws Exception
169         */
170        private void startQueues() throws Exception {
171                // Start channel
172                channel = broker.getNewChannel();
173
174                /*
175                 * Default queue, Round Robin behaviour
176                 */
177
178                // Get info about which exchange and queue will use
179                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
180                String queue = reference;
181                String routingKey = reference;
182
183                // RemoteObject default queue
184                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
185                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
186                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
187
188                // Declares and bindings
189                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
190                        channel.exchangeDeclare(exchange, "direct");
191                }
192                channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
193                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
194                        channel.queueBind(queue, exchange, routingKey);
195                }
196                logger.info("RemoteObject: " + reference + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
197                                + exclusive + ", AutoDelete: " + autoDelete);
198
199                /*
200                 * UID queue
201                 */
202
203                if (UID != null) {
204
205                        boolean uidDurable = false;
206                        boolean uidExclusive = true;
207                        boolean uidAutoDelete = true;
208
209                        channel.queueDeclare(UID, uidDurable, uidExclusive, uidAutoDelete, null);
210                        if (!exchange.equalsIgnoreCase("")) { // Default exchange case
211                                channel.queueBind(UID, exchange, UID);
212                        }
213                }
214
215                /*
216                 * Multi queue, exclusive per each instance
217                 */
218
219                // Get info about the multiQueue
220                String multiExchange = multi + reference;
221                // TODO:String multiExchange = multi + exchange + reference;
222                multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
223
224                // Multi queue (exclusive queue per remoteObject)
225                boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
226                boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
227                boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
228
229                // Declares and bindings
230                channel.exchangeDeclare(multiExchange, "fanout");
231                if (multiQueue == null) {
232                        multiQueue = channel.queueDeclare().getQueue();
233                } else {
234                        channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
235                }
236                channel.queueBind(multiQueue, multiExchange, "");
237                logger.info("RemoteObject: " + reference + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable
238                                + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
239
240                /*
241                 * Consumer
242                 */
243
244                boolean autoAck = false;
245
246                int prefetchCount = 1;
247                channel.basicQos(prefetchCount);
248
249                // Declare a new consumer
250                consumer = new QueueingConsumer(channel);
251                channel.basicConsume(queue, autoAck, consumer);
252                channel.basicConsume(multiQueue, autoAck, consumer);
253                if (UID != null) {
254                        channel.basicConsume(UID, autoAck, consumer);
255                }
256        }
257
258        public void kill() throws IOException {
259                logger.info("Killing objectmq: " + reference + " thread id");
260                killed = true;
261                interrupt();
262                channel.close();
263        }
264
265        public RemoteObject getObj() {
266                return obj;
267        }
268
269        public void setObj(RemoteObject obj) {
270                this.obj = obj;
271        }
272
273        public long getLastExecution() {
274                return lastExec;
275        }
276
277        public boolean isIdle() {
278                return idle;
279        }
280
281}
Note: See TracBrowser for help on using the repository browser.