source: branches/supervisor/src/main/java/omq/server/AInvocationThread.java @ 108

Last change on this file since 108 was 108, checked in by stoda, 11 years ago

Refactoring to enable multinvocation thread done.
Now there's a multiinvocation thread which listens to the multiqueue

File size: 3.7 KB
Line 
1package omq.server;
2
3import java.io.IOException;
4import java.lang.reflect.InvocationTargetException;
5import java.util.Properties;
6
7import omq.common.broker.Broker;
8import omq.common.message.Request;
9import omq.common.message.Response;
10import omq.common.util.Serializer;
11import omq.exception.OmqException;
12
13import org.apache.log4j.Logger;
14
15import com.rabbitmq.client.AMQP.BasicProperties;
16import com.rabbitmq.client.Channel;
17import com.rabbitmq.client.QueueingConsumer;
18import com.rabbitmq.client.QueueingConsumer.Delivery;
19
20public abstract class AInvocationThread extends Thread {
21
22        private static final Logger logger = Logger.getLogger(AInvocationThread.class.getName());
23
24        // RemoteObject
25        protected RemoteObject obj;
26        protected String reference;
27        protected String UID;
28        protected Properties env;
29
30        protected RemoteThreadPool pool;
31
32        // Broker
33        protected Broker broker;
34        protected Serializer serializer;
35
36        // Consumer
37        protected Channel channel;
38        protected QueueingConsumer consumer;
39        protected boolean killed = false;
40
41        public AInvocationThread(RemoteObject obj) throws Exception {
42                this.obj = obj;
43                this.UID = obj.getUID();
44                this.reference = obj.getRef();
45                this.env = obj.getEnv();
46                this.broker = obj.getBroker();
47                this.pool = obj.getPool();
48                this.serializer = broker.getSerializer();
49        }
50
51        @Override
52        public synchronized void start() {
53                try {
54                        startQueues();
55                        super.start();
56                } catch (Exception e) {
57                        logger.error("Cannot start a remoteObject", e);
58                }
59
60        }
61
62        /**
63         * This method starts the queues using the information got in the
64         * environment.
65         *
66         * @throws Exception
67         */
68        protected abstract void startQueues() throws Exception;
69       
70        protected void executeTask(Delivery delivery) throws Exception{
71                String serializerType = delivery.getProperties().getType();
72
73                // Deserialize the request
74                Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
75                String methodName = request.getMethod();
76                String requestID = request.getId();
77
78                logger.debug("Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + ", serializerType: "
79                                + serializerType);
80
81                // Invoke the method
82                Object result = null;
83                OmqException error = null;
84                try {
85                        result = obj.invokeMethod(request.getMethod(), request.getParams());
86                } catch (InvocationTargetException e) {
87                        Throwable throwable = e.getTargetException();
88                        logger.error("Object: " + obj.getRef() + " at method: " + methodName + ", corrID" + requestID, throwable);
89                        error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage());
90                } catch (NoSuchMethodException e) {
91                        logger.error("Object: " + obj.getRef() + " cannot find method: " + methodName);
92                        error = new OmqException(e.getClass().getCanonicalName(), e.getMessage());
93                }
94
95                // Reply if it's necessary
96                if (!request.isAsync()) {
97                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
98
99                        BasicProperties props = delivery.getProperties();
100
101                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId())
102                                        .build();
103
104                        byte[] bytesResponse = serializer.serialize(serializerType, resp);
105                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
106                        logger.debug("Publish sync response -> Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID
107                                        + " replyTo: " + props.getReplyTo());
108                }
109
110                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
111        }
112
113        public void kill() throws IOException {
114                logger.info("Killing objectmq: " + reference + " thread id");
115                killed = true;
116                interrupt();
117                channel.close();
118        }
119
120        public RemoteObject getObj() {
121                return obj;
122        }
123
124        public void setObj(RemoteObject obj) {
125                this.obj = obj;
126        }
127}
Note: See TracBrowser for help on using the repository browser.