source: trunk/objectmq/src/omq/server/remote/request/InvocationThread.java @ 10

Last change on this file since 10 was 10, checked in by stoda, 11 years ago

bug solved: invoke functions without params

File size: 1.9 KB
Line 
1package omq.server.remote.request;
2
3import java.util.concurrent.BlockingQueue;
4
5import omq.common.message.Request;
6import omq.common.message.Response;
7import omq.common.util.Serializer;
8
9import com.rabbitmq.client.AMQP.BasicProperties;
10import com.rabbitmq.client.Channel;
11import com.rabbitmq.client.QueueingConsumer.Delivery;
12
13/**
14 *
15 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
16 *
17 */
18public class InvocationThread extends Thread {
19        private RemoteObject obj;
20        private BlockingQueue<Delivery> deliveryQueue;
21        private boolean killed = false;
22
23        public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue) {
24                this.obj = obj;
25                this.deliveryQueue = deliveryQueue;
26        }
27
28        @Override
29        public void run() {
30                while (!killed) {
31                        try {
32                                // Get the delivery
33                                Delivery delivery = deliveryQueue.take();
34
35                                // Deserialize the json
36                                Request request = Serializer.deserializeRequest(delivery.getBody(), obj);
37
38                                // Invoke the method
39                                Object result = obj.invokeMethod(request.getMethod(), request.getArguments());
40
41                                if (!request.isAsync()) {
42                                        Response resp = new Response(request.getId(), obj.getRef(), result);
43
44                                        Channel channel = obj.getChannel();
45
46                                        BasicProperties props = delivery.getProperties();
47
48                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
49
50                                        channel.basicPublish("", props.getReplyTo(), replyProps, Serializer.serialize(resp));
51                                }
52
53                        } catch (InterruptedException i) {
54                                killed = true;
55                        } catch (Exception e) {
56                                System.out.println("Error a l'Invocation Thread \nException: " + e);
57                                e.printStackTrace();
58                        }
59
60                }
61        }
62
63        public RemoteObject getObj() {
64                return obj;
65        }
66
67        public void setObj(RemoteObject obj) {
68                this.obj = obj;
69        }
70
71        public BlockingQueue<Delivery> getDeliveryQueue() {
72                return deliveryQueue;
73        }
74
75        public void setDeliveryQueue(BlockingQueue<Delivery> deliveryQueue) {
76                this.deliveryQueue = deliveryQueue;
77        }
78}
Note: See TracBrowser for help on using the repository browser.