source: trunk/objectmq/src/omq/client/remote/response/ResponseListener.java @ 9

Last change on this file since 9 was 9, checked in by stoda, 11 years ago

First commit

File size: 4.2 KB
Line 
1package omq.client.remote.response;
2
3import java.util.Hashtable;
4import java.util.Map;
5import java.util.Properties;
6
7import omq.client.proxy.Proxymq;
8import omq.common.remote.RemoteListener;
9import omq.common.util.ParameterQueue;
10
11import com.rabbitmq.client.AMQP.BasicProperties;
12import com.rabbitmq.client.ConsumerCancelledException;
13import com.rabbitmq.client.QueueingConsumer;
14import com.rabbitmq.client.QueueingConsumer.Delivery;
15import com.rabbitmq.client.ShutdownSignalException;
16
17/**
18 * Class that inherits from RemoteListener. It's used in the server side. This
19 * class gets the deliveries from the server and stores them into the proxies
20 *
21 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
22 *
23 */
24public class ResponseListener extends RemoteListener {
25        private static ResponseListener rListener;
26        private Map<String, Map<String, byte[]>> results;
27
28        /**
29         * Protected constructor used by the singleton pattern
30         *
31         * @param env
32         * @throws Exception
33         */
34        protected ResponseListener(Properties env) throws Exception {
35                super(env);
36                // Init the hashtable (it's concurrent)
37                this.results = new Hashtable<String, Map<String, byte[]>>();
38
39                String reply_queue = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
40                channel.queueDeclare(reply_queue, false, false, false, null);
41
42                // Declare a new consumer
43                consumer = new QueueingConsumer(channel);
44                channel.basicConsume(reply_queue, true, consumer);
45        }
46
47        @Override
48        public void run() {
49                Delivery delivery;
50                String uid_request;
51
52                while (!killed) {
53                        try {
54                                // Get the delivery
55
56                                delivery = consumer.nextDelivery();
57
58                                BasicProperties props = delivery.getProperties();
59
60                                // Get the response with its uid
61                                uid_request = delivery.getProperties().getCorrelationId();
62
63                                // Stores the new response
64                                Map<String, byte[]> proxyResults = results.get(props.getAppId());
65
66                                // Put the result into the proxy results and notify him
67                                synchronized (proxyResults) {
68                                        // If we haven't received this response before, we store it
69                                        if (!proxyResults.containsKey(uid_request)) {
70                                                proxyResults.put(uid_request, delivery.getBody());
71                                                proxyResults.notifyAll();
72                                        }
73                                }
74                        } catch (InterruptedException i) {
75                                i.printStackTrace();
76                        } catch (ShutdownSignalException e) {
77                                e.printStackTrace();
78                        } catch (ConsumerCancelledException e) {
79                                e.printStackTrace();
80                        } catch (Exception e) {
81                                e.printStackTrace();
82                        }
83                }
84        }
85
86        /**
87         * Static function which initializes the ResponseListener
88         *
89         * @param env
90         * @throws Exception
91         */
92        public static void init(Properties env) throws Exception {
93                if (rListener == null) {
94                        rListener = new ResponseListener(env);
95                        rListener.start();
96                } else {
97                        throw new Exception("Cannot init because it already exists");
98                }
99        }
100
101        /**
102         * Method to retrieve the unique ResponseListener, this function can also
103         * initialize a ResponseListener using and environment
104         *
105         * @param env
106         * @return unique ResponseListener
107         * @throws Exception
108         */
109        public static ResponseListener getRequestListener(Properties env) throws Exception {
110                if (rListener == null) {
111                        rListener = new ResponseListener(env);
112                        rListener.start();
113                } else {
114                        // TODO: create a new exception to indicate that a response listener
115                        // cannot be init
116                        throw new Exception("Cannot init because it already exists");
117                }
118                return rListener;
119        }
120
121        public static boolean isVoid() {
122                return rListener == null;
123        }
124
125        /**
126         * Method to retrieve the unique ResponseListener
127         *
128         * @return
129         * @throws Exception
130         */
131        public static ResponseListener getRequestListener() throws Exception {
132                if (rListener == null) {
133                        throw new Exception("Request listener not initialized");
134                }
135                return rListener;
136        }
137
138        /**
139         *
140         * @param key
141         * @return whether the map has the param key
142         */
143        public boolean containsKey(String key) {
144                return results.containsKey(key);
145        }
146
147        /**
148         * This method is used to kill the unique responseListener in the system
149         *
150         * @throws Exception
151         */
152        public static void stopResponseListner() throws Exception {
153                rListener.kill();
154                rListener = null;
155        }
156
157        // Revisar això
158        public void registerProxy(Proxymq proxy) {
159                if (!results.containsKey(proxy.getRef())) {
160                        results.put(proxy.getRef(), proxy.getResults());
161                }
162        }
163}
Note: See TracBrowser for help on using the repository browser.