source: branches/supervisor/src/main/java/omq/server/InvocationThread.java @ 106

Last change on this file since 106 was 106, checked in by stoda, 11 years ago

abans que la segueixi liant...

File size: 8.3 KB
RevLine 
[44]1package omq.server;
2
[96]3import java.io.IOException;
[44]4import java.lang.reflect.InvocationTargetException;
[96]5import java.util.Properties;
[44]6
[96]7import omq.common.broker.Broker;
[44]8import omq.common.message.Request;
9import omq.common.message.Response;
[96]10import omq.common.util.ParameterQueue;
[44]11import omq.common.util.Serializer;
12import omq.exception.OmqException;
[96]13import omq.exception.SerializerException;
[44]14
[49]15import org.apache.log4j.Logger;
16
[44]17import com.rabbitmq.client.AMQP.BasicProperties;
18import com.rabbitmq.client.Channel;
[96]19import com.rabbitmq.client.ConsumerCancelledException;
20import com.rabbitmq.client.QueueingConsumer;
[44]21import com.rabbitmq.client.QueueingConsumer.Delivery;
[96]22import com.rabbitmq.client.ShutdownSignalException;
[44]23
24/**
[83]25 * An invocationThread waits for requests an invokes them.
[44]26 *
27 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
28 *
29 */
30public class InvocationThread extends Thread {
[96]31
[49]32        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
[96]33        private static final String multi = "multi#";
34
35        // RemoteObject
[44]36        private RemoteObject obj;
[105]37        private String reference;
[106]38        private String UID;
[96]39        private Properties env;
[100]40        private boolean idle;
41        private long lastExec;
[96]42
[101]43        private RemoteThreadPool pool;
[100]44
[96]45        // Broker
46        private Broker broker;
[91]47        private Serializer serializer;
[96]48
49        // Consumer
50        private Channel channel;
51        private QueueingConsumer consumer;
52        private String multiQueue;
[44]53        private boolean killed = false;
54
[101]55        public InvocationThread(RemoteObject obj) throws Exception {
[44]56                this.obj = obj;
[106]57                this.UID = obj.getUID();
[105]58                this.reference = obj.getRef();
[96]59                this.env = obj.getEnv();
[101]60                this.broker = obj.getBroker();
61                this.pool = obj.getPool();
[96]62                this.serializer = broker.getSerializer();
[100]63                this.lastExec = 0;
64                this.idle = true;
[44]65        }
66
67        @Override
[96]68        public synchronized void start() {
69                try {
70                        startQueues();
71                        super.start();
72                } catch (Exception e) {
73                        logger.error("Cannot start a remoteObject", e);
74                }
75
76        }
77
78        @Override
[44]79        public void run() {
80                while (!killed) {
81                        try {
82                                // Get the delivery
[96]83                                Delivery delivery = consumer.nextDelivery();
[44]84
[100]85                                // This thread gets busy
86                                pool.getBusy().incrementAndGet();
87                                idle = false;
88
[47]89                                String serializerType = delivery.getProperties().getType();
90
[106]91                                // Deserialize the request
[53]92                                Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
[44]93                                String methodName = request.getMethod();
94                                String requestID = request.getId();
95
[63]96                                logger.debug("Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + ", serializerType: " + serializerType);
[44]97
98                                // Invoke the method
99                                Object result = null;
100                                OmqException error = null;
101                                try {
102                                        result = obj.invokeMethod(request.getMethod(), request.getParams());
103                                } catch (InvocationTargetException e) {
104                                        Throwable throwable = e.getTargetException();
[49]105                                        logger.error("Object: " + obj.getRef() + " at method: " + methodName + ", corrID" + requestID, throwable);
[44]106                                        error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage());
107                                } catch (NoSuchMethodException e) {
[49]108                                        logger.error("Object: " + obj.getRef() + " cannot find method: " + methodName);
[44]109                                        error = new OmqException(e.getClass().getCanonicalName(), e.getMessage());
110                                }
111
112                                // Reply if it's necessary
113                                if (!request.isAsync()) {
114                                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
115
116                                        BasicProperties props = delivery.getProperties();
117
118                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
119
[53]120                                        byte[] bytesResponse = serializer.serialize(serializerType, resp);
[44]121                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
[54]122                                        logger.debug("Publish sync response -> Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + " replyTo: "
123                                                        + props.getReplyTo());
[44]124                                }
[96]125
[91]126                                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
[100]127
128                                // The thread is now idle
129                                lastExec = System.currentTimeMillis();
130                                idle = true;
131                                pool.getBusy().decrementAndGet();
132
[44]133                        } catch (InterruptedException i) {
[49]134                                logger.error(i);
[96]135                        } catch (ShutdownSignalException e) {
136                                logger.error(e);
137                                try {
138                                        if (channel.isOpen()) {
139                                                channel.close();
140                                        }
141                                        startQueues();
142                                } catch (Exception e1) {
143                                        try {
144                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
145                                                Thread.sleep(milis);
146                                        } catch (InterruptedException e2) {
147                                                logger.error(e2);
148                                        }
149                                        logger.error(e1);
150                                }
151                        } catch (ConsumerCancelledException e) {
152                                logger.error(e);
153                        } catch (SerializerException e) {
154                                logger.error(e);
[44]155                        } catch (Exception e) {
[101]156                                e.printStackTrace();
[96]157                                logger.error(e);
[44]158                        }
159
160                }
[102]161                logger.info("ObjectMQ ('" + obj.getRef() + "') InvocationThread " + Thread.currentThread().getId() + " is killed");
[44]162        }
163
[96]164        /**
165         * This method starts the queues using the information got in the
166         * environment.
167         *
168         * @throws Exception
169         */
170        private void startQueues() throws Exception {
171                // Start channel
172                channel = broker.getNewChannel();
173
174                /*
175                 * Default queue, Round Robin behaviour
176                 */
177
178                // Get info about which exchange and queue will use
179                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
[105]180                String queue = reference;
181                String routingKey = reference;
[96]182
183                // RemoteObject default queue
184                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
185                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
186                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
187
188                // Declares and bindings
189                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
190                        channel.exchangeDeclare(exchange, "direct");
191                }
192                channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
193                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
194                        channel.queueBind(queue, exchange, routingKey);
195                }
[105]196                logger.info("RemoteObject: " + reference + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
[96]197                                + exclusive + ", AutoDelete: " + autoDelete);
198
199                /*
[106]200                 * UID queue
201                 */
202
203                if (UID != null) {
204
205                        boolean uidDurable = false;
206                        boolean uidExclusive = true;
207                        boolean uidAutoDelete = true;
208
209                        channel.queueDeclare(UID, uidDurable, uidExclusive, uidAutoDelete, null);
210                        if (!exchange.equalsIgnoreCase("")) { // Default exchange case
211                                channel.queueBind(UID, exchange, UID);
212                        }
213                }
214
215                /*
[96]216                 * Multi queue, exclusive per each instance
217                 */
218
219                // Get info about the multiQueue
[105]220                String multiExchange = multi + reference;
221                // TODO:String multiExchange = multi + exchange + reference;
[96]222                multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
223
224                // Multi queue (exclusive queue per remoteObject)
225                boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
226                boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
227                boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
228
229                // Declares and bindings
230                channel.exchangeDeclare(multiExchange, "fanout");
231                if (multiQueue == null) {
232                        multiQueue = channel.queueDeclare().getQueue();
233                } else {
234                        channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
235                }
236                channel.queueBind(multiQueue, multiExchange, "");
[105]237                logger.info("RemoteObject: " + reference + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable
[96]238                                + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
239
240                /*
241                 * Consumer
242                 */
243
244                boolean autoAck = false;
245
246                int prefetchCount = 1;
247                channel.basicQos(prefetchCount);
248
249                // Declare a new consumer
250                consumer = new QueueingConsumer(channel);
251                channel.basicConsume(queue, autoAck, consumer);
252                channel.basicConsume(multiQueue, autoAck, consumer);
[106]253                if (UID != null) {
254                        channel.basicConsume(UID, autoAck, consumer);
255                }
[96]256        }
257
258        public void kill() throws IOException {
[105]259                logger.info("Killing objectmq: " + reference + " thread id");
[96]260                killed = true;
261                interrupt();
262                channel.close();
263        }
264
[44]265        public RemoteObject getObj() {
266                return obj;
267        }
268
269        public void setObj(RemoteObject obj) {
270                this.obj = obj;
271        }
272
[100]273        public long getLastExecution() {
274                return lastExec;
275        }
276
277        public boolean isIdle() {
278                return idle;
279        }
280
[44]281}
Note: See TracBrowser for help on using the repository browser.