source: trunk/src/main/java/omq/server/InvocationThread.java @ 83

Last change on this file since 83 was 83, checked in by stoda, 11 years ago

J

File size: 3.4 KB
Line 
1package omq.server;
2
3import java.lang.reflect.InvocationTargetException;
4import java.util.concurrent.BlockingQueue;
5
6import omq.common.message.Request;
7import omq.common.message.Response;
8import omq.common.util.Serializer;
9import omq.exception.OmqException;
10
11import org.apache.log4j.Logger;
12
13import com.rabbitmq.client.AMQP.BasicProperties;
14import com.rabbitmq.client.Channel;
15import com.rabbitmq.client.QueueingConsumer.Delivery;
16
17/**
18 * An invocationThread waits for requests an invokes them.
19 *
20 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
21 *
22 */
23public class InvocationThread extends Thread {
24        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
25        private RemoteObject obj;
26        private transient Serializer serializer;
27        private BlockingQueue<Delivery> deliveryQueue;
28        private boolean killed = false;
29
30        public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue, Serializer serializer) {
31                this.obj = obj;
32                this.deliveryQueue = deliveryQueue;
33                this.serializer = serializer;
34        }
35
36        @Override
37        public void run() {
38                while (!killed) {
39                        try {
40                                // Get the delivery
41                                Delivery delivery = deliveryQueue.take();
42
43                                String serializerType = delivery.getProperties().getType();
44
45                                // Deserialize the json
46                                Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
47                                String methodName = request.getMethod();
48                                String requestID = request.getId();
49
50                                logger.debug("Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + ", serializerType: " + serializerType);
51
52                                // Invoke the method
53                                Object result = null;
54                                OmqException error = null;
55                                try {
56                                        result = obj.invokeMethod(request.getMethod(), request.getParams());
57                                } catch (InvocationTargetException e) {
58                                        Throwable throwable = e.getTargetException();
59                                        logger.error("Object: " + obj.getRef() + " at method: " + methodName + ", corrID" + requestID, throwable);
60                                        error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage());
61                                } catch (NoSuchMethodException e) {
62                                        logger.error("Object: " + obj.getRef() + " cannot find method: " + methodName);
63                                        error = new OmqException(e.getClass().getCanonicalName(), e.getMessage());
64                                }
65
66                                // Reply if it's necessary
67                                if (!request.isAsync()) {
68                                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
69
70                                        Channel channel = obj.getChannel();
71
72                                        BasicProperties props = delivery.getProperties();
73
74                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
75
76                                        byte[] bytesResponse = serializer.serialize(serializerType, resp);
77                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
78                                        logger.debug("Publish sync response -> Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + " replyTo: "
79                                                        + props.getReplyTo());
80                                }
81
82                        } catch (InterruptedException i) {
83                                logger.error(i);
84                                killed = true;
85                        } catch (Exception e) {
86                                logger.error("Object: " + obj.getRef(), e);
87                        }
88
89                }
90        }
91
92        public RemoteObject getObj() {
93                return obj;
94        }
95
96        public void setObj(RemoteObject obj) {
97                this.obj = obj;
98        }
99
100        public BlockingQueue<Delivery> getDeliveryQueue() {
101                return deliveryQueue;
102        }
103
104        public void setDeliveryQueue(BlockingQueue<Delivery> deliveryQueue) {
105                this.deliveryQueue = deliveryQueue;
106        }
107}
Note: See TracBrowser for help on using the repository browser.