source: trunk/src/main/java/omq/server/InvocationThread.java @ 53

Last change on this file since 53 was 53, checked in by stoda, 11 years ago

Non static broker
TODO: change all test to see whether the new broker configuration works

File size: 3.2 KB
RevLine 
[44]1package omq.server;
2
3import java.lang.reflect.InvocationTargetException;
4import java.util.concurrent.BlockingQueue;
5
6import omq.common.message.Request;
7import omq.common.message.Response;
8import omq.common.util.Serializer;
9import omq.exception.OmqException;
10
[49]11import org.apache.log4j.Logger;
12
[44]13import com.rabbitmq.client.AMQP.BasicProperties;
14import com.rabbitmq.client.Channel;
15import com.rabbitmq.client.QueueingConsumer.Delivery;
16
17/**
18 *
19 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
20 *
21 */
22public class InvocationThread extends Thread {
[49]23        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
[44]24        private RemoteObject obj;
[53]25        private transient Serializer serializer;
[44]26        private BlockingQueue<Delivery> deliveryQueue;
27        private boolean killed = false;
28
[53]29        public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue, Serializer serializer) {
[44]30                this.obj = obj;
31                this.deliveryQueue = deliveryQueue;
[53]32                this.serializer = serializer;
[44]33        }
34
35        @Override
36        public void run() {
37                while (!killed) {
38                        try {
39                                // Get the delivery
40                                Delivery delivery = deliveryQueue.take();
41
[47]42                                String serializerType = delivery.getProperties().getType();
43
[44]44                                // Deserialize the json
[53]45                                Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
[44]46                                // Log.saveLog("Server-Deserialize", delivery.getBody());
47
48                                String methodName = request.getMethod();
49                                String requestID = request.getId();
50
[49]51                                logger.debug("Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID);
[44]52
53                                // Invoke the method
54                                Object result = null;
55                                OmqException error = null;
56                                try {
57                                        result = obj.invokeMethod(request.getMethod(), request.getParams());
58                                } catch (InvocationTargetException e) {
59                                        Throwable throwable = e.getTargetException();
[49]60                                        logger.error("Object: " + obj.getRef() + " at method: " + methodName + ", corrID" + requestID, throwable);
[44]61                                        error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage());
62                                } catch (NoSuchMethodException e) {
[49]63                                        logger.error("Object: " + obj.getRef() + " cannot find method: " + methodName);
[44]64                                        error = new OmqException(e.getClass().getCanonicalName(), e.getMessage());
65                                }
66
67                                // Reply if it's necessary
68                                if (!request.isAsync()) {
69                                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
70
71                                        Channel channel = obj.getChannel();
72
73                                        BasicProperties props = delivery.getProperties();
74
75                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
76
[53]77                                        byte[] bytesResponse = serializer.serialize(serializerType, resp);
[44]78                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
79
80                                        // Log.saveLog("Server-Serialize", bytesResponse);
81                                }
82
83                        } catch (InterruptedException i) {
[49]84                                logger.error(i);
[44]85                                killed = true;
86                        } catch (Exception e) {
[49]87                                logger.error("Object: " + obj.getRef(), e);
[44]88                        }
89
90                }
91        }
92
93        public RemoteObject getObj() {
94                return obj;
95        }
96
97        public void setObj(RemoteObject obj) {
98                this.obj = obj;
99        }
100
101        public BlockingQueue<Delivery> getDeliveryQueue() {
102                return deliveryQueue;
103        }
104
105        public void setDeliveryQueue(BlockingQueue<Delivery> deliveryQueue) {
106                this.deliveryQueue = deliveryQueue;
107        }
108}
Note: See TracBrowser for help on using the repository browser.