source: branches/objectmq_old/src/omq/server/InvocationThread.java @ 112

Last change on this file since 112 was 39, checked in by stoda, 11 years ago

Exception test revised.
Broker.lookup does not need the casting
GsonImp? arguments problem solved
MultiProcessTest? added

File size: 2.8 KB
Line 
1package omq.server;
2
3import java.lang.reflect.InvocationTargetException;
4import java.util.concurrent.BlockingQueue;
5
6import omq.common.message.Request;
7import omq.common.message.Response;
8import omq.common.util.Serializer;
9import omq.exception.OmqException;
10
11import com.rabbitmq.client.AMQP.BasicProperties;
12import com.rabbitmq.client.Channel;
13import com.rabbitmq.client.QueueingConsumer.Delivery;
14
15/**
16 *
17 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
18 *
19 */
20public class InvocationThread extends Thread {
21        private RemoteObject obj;
22        private BlockingQueue<Delivery> deliveryQueue;
23        private boolean killed = false;
24
25        public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue) {
26                this.obj = obj;
27                this.deliveryQueue = deliveryQueue;
28        }
29
30        @Override
31        public void run() {
32                while (!killed) {
33                        try {
34                                // Get the delivery
35                                Delivery delivery = deliveryQueue.take();
36
37                                // Deserialize the json
38                                Request request = Serializer.deserializeRequest(delivery.getBody(), obj);
39                                // Log.saveLog("Server-Deserialize", delivery.getBody());
40
41                                String methodName = request.getMethod();
42                                String requestID = request.getId();
43
44                                System.out.println("Invoke method: " + methodName + " CorrID: " + requestID);
45
46                                // Invoke the method
47                                Object result = null;
48                                OmqException error = null;
49                                try {
50                                        result = obj.invokeMethod(request.getMethod(), request.getParams());
51                                } catch (InvocationTargetException e) {
52                                        Throwable throwable = e.getTargetException();
53                                        error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage());
54                                } catch (NoSuchMethodException e) {
55                                        error = new OmqException(e.getClass().getCanonicalName(), e.getMessage());
56                                }
57
58                                // Reply if it's necessary
59                                if (!request.isAsync()) {
60                                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
61
62                                        Channel channel = obj.getChannel();
63
64                                        BasicProperties props = delivery.getProperties();
65
66                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
67
68                                        byte[] bytesResponse = Serializer.serialize(resp);
69                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
70
71                                        // Log.saveLog("Server-Serialize", bytesResponse);
72                                }
73
74                        } catch (InterruptedException i) {
75                                i.printStackTrace();
76                                killed = true;
77                        } catch (Exception e) {
78                                System.out.println("Error a l'Invocation Thread \nException: " + e);
79                                e.printStackTrace();
80                        }
81
82                }
83        }
84
85        public RemoteObject getObj() {
86                return obj;
87        }
88
89        public void setObj(RemoteObject obj) {
90                this.obj = obj;
91        }
92
93        public BlockingQueue<Delivery> getDeliveryQueue() {
94                return deliveryQueue;
95        }
96
97        public void setDeliveryQueue(BlockingQueue<Delivery> deliveryQueue) {
98                this.deliveryQueue = deliveryQueue;
99        }
100}
Note: See TracBrowser for help on using the repository browser.