source: branches/objectmq-1.0/src/omq/server/remote/request/InvocationThread.java @ 33

Last change on this file since 33 was 33, checked in by amoreno, 11 years ago

new release version

File size: 2.7 KB
Line 
1package omq.server.remote.request;
2
3import java.util.concurrent.BlockingQueue;
4
5import omq.common.message.Request;
6import omq.common.message.Response;
7import omq.common.util.Serializer;
8
9import com.rabbitmq.client.AMQP.BasicProperties;
10import com.rabbitmq.client.Channel;
11import com.rabbitmq.client.QueueingConsumer.Delivery;
12
13/**
14 *
15 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
16 *
17 */
18public class InvocationThread extends Thread {
19        private RemoteObject obj;
20        private BlockingQueue<Delivery> deliveryQueue;
21        private boolean killed = false;
22
23        public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue) {
24                this.obj = obj;
25                this.deliveryQueue = deliveryQueue;
26        }
27
28        @Override
29        public void run() {
30                while (!killed) {
31                        try {
32                                // Get the delivery
33                                Delivery delivery = deliveryQueue.take();
34
35                                // Deserialize the json
36                                Request request = Serializer.deserializeRequest(delivery.getBody(), obj);
37                                //Log.saveLog("Server-Deserialize", delivery.getBody());
38
39                                String methodName = request.getMethod();
40                                String requestID = request.getId();
41
42                                System.out.println("Invoke method: " + methodName + " CorrID: " + requestID);
43
44                                // Changed ---------------------------------------
45                                Object result = null;
46                                if ("commit".equalsIgnoreCase(methodName)) {
47                                        Object[] arguments = request.getArguments();
48                                        arguments[1] = ((String) arguments[1]) + "@@" + requestID;
49                                        result = obj.invokeMethod(methodName, arguments);
50                                } else {
51                                        result = obj.invokeMethod(request.getMethod(), request.getArguments());
52                                }
53                                // -----------------------------------------------
54
55                                // // Invoke the method
56                                // Object result = obj.invokeMethod(request.getMethod(),
57                                // request.getArguments());
58
59                                if (!request.isAsync()) {
60                                        Response resp = new Response(request.getId(), obj.getRef(), result);
61
62                                        Channel channel = obj.getChannel();
63
64                                        BasicProperties props = delivery.getProperties();
65
66                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
67
68                                        byte[] bytesResponse = Serializer.serialize(resp);
69                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
70
71                                        //Log.saveLog("Server-Serialize", bytesResponse);
72                                }
73
74                        } catch (InterruptedException i) {
75                                i.printStackTrace();
76                                killed = true;
77                        } catch (Exception e) {
78                                System.out.println("Error a l'Invocation Thread \nException: " + e);
79                                e.printStackTrace();
80                        }
81
82                }
83        }
84
85        public RemoteObject getObj() {
86                return obj;
87        }
88
89        public void setObj(RemoteObject obj) {
90                this.obj = obj;
91        }
92
93        public BlockingQueue<Delivery> getDeliveryQueue() {
94                return deliveryQueue;
95        }
96
97        public void setDeliveryQueue(BlockingQueue<Delivery> deliveryQueue) {
98                this.deliveryQueue = deliveryQueue;
99        }
100}
Note: See TracBrowser for help on using the repository browser.