source: trunk/src/main/java/omq/server/InvocationThread.java @ 49

Last change on this file since 49 was 49, checked in by stoda, 11 years ago

log4j added

File size: 3.1 KB
Line 
1package omq.server;
2
3import java.lang.reflect.InvocationTargetException;
4import java.util.concurrent.BlockingQueue;
5
6import omq.common.message.Request;
7import omq.common.message.Response;
8import omq.common.util.Serializer;
9import omq.exception.OmqException;
10
11import org.apache.log4j.Logger;
12
13import com.rabbitmq.client.AMQP.BasicProperties;
14import com.rabbitmq.client.Channel;
15import com.rabbitmq.client.QueueingConsumer.Delivery;
16
17/**
18 *
19 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
20 *
21 */
22public class InvocationThread extends Thread {
23        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
24        private RemoteObject obj;
25        private BlockingQueue<Delivery> deliveryQueue;
26        private boolean killed = false;
27
28        public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue) {
29                this.obj = obj;
30                this.deliveryQueue = deliveryQueue;
31        }
32
33        @Override
34        public void run() {
35                while (!killed) {
36                        try {
37                                // Get the delivery
38                                Delivery delivery = deliveryQueue.take();
39
40                                String serializerType = delivery.getProperties().getType();
41
42                                // Deserialize the json
43                                Request request = Serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
44                                // Log.saveLog("Server-Deserialize", delivery.getBody());
45
46                                String methodName = request.getMethod();
47                                String requestID = request.getId();
48
49                                logger.debug("Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID);
50
51                                // Invoke the method
52                                Object result = null;
53                                OmqException error = null;
54                                try {
55                                        result = obj.invokeMethod(request.getMethod(), request.getParams());
56                                } catch (InvocationTargetException e) {
57                                        Throwable throwable = e.getTargetException();
58                                        logger.error("Object: " + obj.getRef() + " at method: " + methodName + ", corrID" + requestID, throwable);
59                                        error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage());
60                                } catch (NoSuchMethodException e) {
61                                        logger.error("Object: " + obj.getRef() + " cannot find method: " + methodName);
62                                        error = new OmqException(e.getClass().getCanonicalName(), e.getMessage());
63                                }
64
65                                // Reply if it's necessary
66                                if (!request.isAsync()) {
67                                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
68
69                                        Channel channel = obj.getChannel();
70
71                                        BasicProperties props = delivery.getProperties();
72
73                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
74
75                                        byte[] bytesResponse = Serializer.serialize(serializerType, resp);
76                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
77
78                                        // Log.saveLog("Server-Serialize", bytesResponse);
79                                }
80
81                        } catch (InterruptedException i) {
82                                logger.error(i);
83                                killed = true;
84                        } catch (Exception e) {
85                                logger.error("Object: " + obj.getRef(), e);
86                        }
87
88                }
89        }
90
91        public RemoteObject getObj() {
92                return obj;
93        }
94
95        public void setObj(RemoteObject obj) {
96                this.obj = obj;
97        }
98
99        public BlockingQueue<Delivery> getDeliveryQueue() {
100                return deliveryQueue;
101        }
102
103        public void setDeliveryQueue(BlockingQueue<Delivery> deliveryQueue) {
104                this.deliveryQueue = deliveryQueue;
105        }
106}
Note: See TracBrowser for help on using the repository browser.