source: branches/supervisor/src/main/java/omq/server/InvocationThread.java @ 91

Last change on this file since 91 was 91, checked in by stoda, 11 years ago

Semaphores added and removed, ack error discovered and solutioned... Some tests added

Supervisor interface created and more things I'll do later...

TODO: supervisor!!

File size: 3.7 KB
Line 
1package omq.server;
2
3import java.lang.reflect.InvocationTargetException;
4import java.util.concurrent.BlockingQueue;
5
6import omq.common.message.Request;
7import omq.common.message.Response;
8import omq.common.util.Serializer;
9import omq.exception.OmqException;
10
11import org.apache.log4j.Logger;
12
13import com.rabbitmq.client.AMQP.BasicProperties;
14import com.rabbitmq.client.Channel;
15import com.rabbitmq.client.QueueingConsumer.Delivery;
16
17/**
18 * An invocationThread waits for requests an invokes them.
19 *
20 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
21 *
22 */
23public class InvocationThread extends Thread {
24        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
25        private RemoteObject obj;
26        private Serializer serializer;
27        // private RemoteWrapper wrapper;
28        private BlockingQueue<Delivery> deliveryQueue;
29        private boolean killed = false;
30
31        public InvocationThread(RemoteObject obj, RemoteWrapper wrapper, Serializer serializer) {
32                this.obj = obj;
33                // this.wrapper = wrapper;
34                this.deliveryQueue = wrapper.getDeliveryQueue();
35                this.serializer = serializer;
36        }
37
38        @Override
39        public void run() {
40                while (!killed) {
41                        try {
42                                // Get the delivery
43                                Delivery delivery = deliveryQueue.take();
44
45                                // // Indicate this thread is not available
46                                // wrapper.increaseBusy();
47
48                                String serializerType = delivery.getProperties().getType();
49
50                                // Deserialize the json
51                                Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
52                                String methodName = request.getMethod();
53                                String requestID = request.getId();
54
55                                logger.debug("Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + ", serializerType: " + serializerType);
56
57                                // Invoke the method
58                                Object result = null;
59                                OmqException error = null;
60                                try {
61                                        result = obj.invokeMethod(request.getMethod(), request.getParams());
62                                } catch (InvocationTargetException e) {
63                                        Throwable throwable = e.getTargetException();
64                                        logger.error("Object: " + obj.getRef() + " at method: " + methodName + ", corrID" + requestID, throwable);
65                                        error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage());
66                                } catch (NoSuchMethodException e) {
67                                        logger.error("Object: " + obj.getRef() + " cannot find method: " + methodName);
68                                        error = new OmqException(e.getClass().getCanonicalName(), e.getMessage());
69                                }
70
71                               
72                                Channel channel = obj.getChannel();
73                               
74                               
75                                // Reply if it's necessary
76                                if (!request.isAsync()) {
77                                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
78
79                                       
80
81                                        BasicProperties props = delivery.getProperties();
82
83                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
84
85                                        byte[] bytesResponse = serializer.serialize(serializerType, resp);
86                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
87                                        logger.debug("Publish sync response -> Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + " replyTo: "
88                                                        + props.getReplyTo());
89                                }
90                               
91                                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
92                               
93                                // // Indicate this thread is available
94                                // wrapper.decreaseBusy();
95                        } catch (InterruptedException i) {
96                                logger.error(i);
97                                killed = true;
98                        } catch (Exception e) {
99                                logger.error("Object: " + obj.getRef(), e);
100                        }
101
102                }
103        }
104
105        public RemoteObject getObj() {
106                return obj;
107        }
108
109        public void setObj(RemoteObject obj) {
110                this.obj = obj;
111        }
112
113        public BlockingQueue<Delivery> getDeliveryQueue() {
114                return deliveryQueue;
115        }
116
117        public void setDeliveryQueue(BlockingQueue<Delivery> deliveryQueue) {
118                this.deliveryQueue = deliveryQueue;
119        }
120}
Note: See TracBrowser for help on using the repository browser.