source: trunk/src/main/java/omq/server/InvocationThread.java @ 47

Last change on this file since 47 was 47, checked in by stoda, 11 years ago

Refactoring Environment class - deleted.
StopBroker? problems solved (?)
Server can receive send and receive messages in different formats.
Some tests modified

TODO: finish all the tests, add log4j

File size: 2.9 KB
Line 
1package omq.server;
2
3import java.lang.reflect.InvocationTargetException;
4import java.util.concurrent.BlockingQueue;
5
6import omq.common.message.Request;
7import omq.common.message.Response;
8import omq.common.util.Serializer;
9import omq.exception.OmqException;
10
11import com.rabbitmq.client.AMQP.BasicProperties;
12import com.rabbitmq.client.Channel;
13import com.rabbitmq.client.QueueingConsumer.Delivery;
14
15/**
16 *
17 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
18 *
19 */
20public class InvocationThread extends Thread {
21        private RemoteObject obj;
22        private BlockingQueue<Delivery> deliveryQueue;
23        private boolean killed = false;
24
25        public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue) {
26                this.obj = obj;
27                this.deliveryQueue = deliveryQueue;
28        }
29
30        @Override
31        public void run() {
32                while (!killed) {
33                        try {
34                                // Get the delivery
35                                Delivery delivery = deliveryQueue.take();
36
37                                String serializerType = delivery.getProperties().getType();
38
39                                // Deserialize the json
40                                Request request = Serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
41                                // Log.saveLog("Server-Deserialize", delivery.getBody());
42
43                                String methodName = request.getMethod();
44                                String requestID = request.getId();
45
46                                System.out.println("Invoke method: " + methodName + " CorrID: " + requestID);
47
48                                // Invoke the method
49                                Object result = null;
50                                OmqException error = null;
51                                try {
52                                        result = obj.invokeMethod(request.getMethod(), request.getParams());
53                                } catch (InvocationTargetException e) {
54                                        Throwable throwable = e.getTargetException();
55                                        error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage());
56                                } catch (NoSuchMethodException e) {
57                                        error = new OmqException(e.getClass().getCanonicalName(), e.getMessage());
58                                }
59
60                                // Reply if it's necessary
61                                if (!request.isAsync()) {
62                                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
63
64                                        Channel channel = obj.getChannel();
65
66                                        BasicProperties props = delivery.getProperties();
67
68                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
69
70                                        byte[] bytesResponse = Serializer.serialize(serializerType, resp);
71                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
72
73                                        // Log.saveLog("Server-Serialize", bytesResponse);
74                                }
75
76                        } catch (InterruptedException i) {
77                                i.printStackTrace();
78                                killed = true;
79                        } catch (Exception e) {
80                                System.out.println("Error a l'Invocation Thread \nException: " + e);
81                                e.printStackTrace();
82                        }
83
84                }
85        }
86
87        public RemoteObject getObj() {
88                return obj;
89        }
90
91        public void setObj(RemoteObject obj) {
92                this.obj = obj;
93        }
94
95        public BlockingQueue<Delivery> getDeliveryQueue() {
96                return deliveryQueue;
97        }
98
99        public void setDeliveryQueue(BlockingQueue<Delivery> deliveryQueue) {
100                this.deliveryQueue = deliveryQueue;
101        }
102}
Note: See TracBrowser for help on using the repository browser.