source: trunk/src/main/java/omq/server/InvocationThread.java @ 44

Last change on this file since 44 was 44, checked in by stoda, 11 years ago

Objectmq converted to maven project

File size: 2.8 KB
Line 
1package omq.server;
2
3import java.lang.reflect.InvocationTargetException;
4import java.util.concurrent.BlockingQueue;
5
6import omq.common.message.Request;
7import omq.common.message.Response;
8import omq.common.util.Serializer;
9import omq.exception.OmqException;
10
11import com.rabbitmq.client.AMQP.BasicProperties;
12import com.rabbitmq.client.Channel;
13import com.rabbitmq.client.QueueingConsumer.Delivery;
14
15/**
16 *
17 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
18 *
19 */
20public class InvocationThread extends Thread {
21        private RemoteObject obj;
22        private BlockingQueue<Delivery> deliveryQueue;
23        private boolean killed = false;
24
25        public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue) {
26                this.obj = obj;
27                this.deliveryQueue = deliveryQueue;
28        }
29
30        @Override
31        public void run() {
32                while (!killed) {
33                        try {
34                                // Get the delivery
35                                Delivery delivery = deliveryQueue.take();
36
37                                // Deserialize the json
38                                Request request = Serializer.deserializeRequest(delivery.getBody(), obj);
39                                // Log.saveLog("Server-Deserialize", delivery.getBody());
40
41                                String methodName = request.getMethod();
42                                String requestID = request.getId();
43
44                                System.out.println("Invoke method: " + methodName + " CorrID: " + requestID);
45
46                                // Invoke the method
47                                Object result = null;
48                                OmqException error = null;
49                                try {
50                                        result = obj.invokeMethod(request.getMethod(), request.getParams());
51                                } catch (InvocationTargetException e) {
52                                        Throwable throwable = e.getTargetException();
53                                        error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage());
54                                } catch (NoSuchMethodException e) {
55                                        error = new OmqException(e.getClass().getCanonicalName(), e.getMessage());
56                                }
57
58                                // Reply if it's necessary
59                                if (!request.isAsync()) {
60                                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
61
62                                        Channel channel = obj.getChannel();
63
64                                        BasicProperties props = delivery.getProperties();
65
66                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
67
68                                        byte[] bytesResponse = Serializer.serialize(resp);
69                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
70
71                                        // Log.saveLog("Server-Serialize", bytesResponse);
72                                }
73
74                        } catch (InterruptedException i) {
75                                i.printStackTrace();
76                                killed = true;
77                        } catch (Exception e) {
78                                System.out.println("Error a l'Invocation Thread \nException: " + e);
79                                e.printStackTrace();
80                        }
81
82                }
83        }
84
85        public RemoteObject getObj() {
86                return obj;
87        }
88
89        public void setObj(RemoteObject obj) {
90                this.obj = obj;
91        }
92
93        public BlockingQueue<Delivery> getDeliveryQueue() {
94                return deliveryQueue;
95        }
96
97        public void setDeliveryQueue(BlockingQueue<Delivery> deliveryQueue) {
98                this.deliveryQueue = deliveryQueue;
99        }
100}
Note: See TracBrowser for help on using the repository browser.