source: branches/objectmq-1.0/src/omq/client/remote/response/ResponseListener.java @ 33

Last change on this file since 33 was 33, checked in by amoreno, 11 years ago

new release version

File size: 4.2 KB
Line 
1package omq.client.remote.response;
2
3import java.util.Hashtable;
4import java.util.Map;
5import java.util.Properties;
6
7import omq.client.proxy.Proxymq;
8import omq.common.remote.RemoteListener;
9import omq.common.util.ParameterQueue;
10
11import com.rabbitmq.client.AMQP.BasicProperties;
12import com.rabbitmq.client.ConsumerCancelledException;
13import com.rabbitmq.client.QueueingConsumer;
14import com.rabbitmq.client.QueueingConsumer.Delivery;
15import com.rabbitmq.client.ShutdownSignalException;
16
17/**
18 * Class that inherits from RemoteListener. It's used in the server side. This
19 * class gets the deliveries from the server and stores them into the proxies
20 *
21 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
22 *
23 */
24public class ResponseListener extends RemoteListener {
25        private static ResponseListener rListener;
26        private Map<String, Map<String, byte[]>> results;
27
28        /**
29         * Protected constructor used by the singleton pattern
30         *
31         * @param env
32         * @throws Exception
33         */
34        protected ResponseListener(Properties env) throws Exception {
35                super(env);
36                // Init the hashtable (it's concurrent)
37                this.results = new Hashtable<String, Map<String, byte[]>>();
38
39                String reply_queue = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
40                channel.queueDeclare(reply_queue, false, false, false, null);
41
42                // Declare a new consumer
43                consumer = new QueueingConsumer(channel);
44                channel.basicConsume(reply_queue, true, consumer);
45        }
46
47        @Override
48        public void run() {
49                Delivery delivery;
50                String uid_request;
51
52                while (!killed) {
53                        try {
54                                // Get the delivery
55
56                                delivery = consumer.nextDelivery();
57
58                                BasicProperties props = delivery.getProperties();
59
60                                // Get the response with its uid
61                                uid_request = delivery.getProperties().getCorrelationId();
62                                System.out.println("Response received -> " + uid_request);
63
64                                // Stores the new response
65                                Map<String, byte[]> proxyResults = results.get(props.getAppId());
66
67                                // Put the result into the proxy results and notify him
68                                synchronized (proxyResults) {
69                                        // If we haven't received this response before, we store it
70                                        if (!proxyResults.containsKey(uid_request)) {
71                                                proxyResults.put(uid_request, delivery.getBody());
72                                                proxyResults.notifyAll();
73                                        }
74                                }
75                        } catch (InterruptedException i) {
76                                i.printStackTrace();
77                        } catch (ShutdownSignalException e) {
78                                e.printStackTrace();
79                        } catch (ConsumerCancelledException e) {
80                                e.printStackTrace();
81                        } catch (Exception e) {
82                                e.printStackTrace();
83                        }
84                }
85        }
86
87        /**
88         * Static function which initializes the ResponseListener
89         *
90         * @param env
91         * @throws Exception
92         */
93        public static void init(Properties env) throws Exception {
94                if (rListener == null) {
95                        rListener = new ResponseListener(env);
96                        rListener.start();
97                } else {
98                        throw new Exception("Cannot init because it already exists");
99                }
100        }
101
102        /**
103         * Method to retrieve the unique ResponseListener, this function can also
104         * initialize a ResponseListener using and environment
105         *
106         * @param env
107         * @return unique ResponseListener
108         * @throws Exception
109         */
110        public static ResponseListener getRequestListener(Properties env) throws Exception {
111                if (rListener == null) {
112                        rListener = new ResponseListener(env);
113                        rListener.start();
114                } else {
115                        // TODO: create a new exception to indicate that a response listener
116                        // cannot be init
117                        throw new Exception("Cannot init because it already exists");
118                }
119                return rListener;
120        }
121
122        public static boolean isVoid() {
123                return rListener == null;
124        }
125
126        /**
127         * Method to retrieve the unique ResponseListener
128         *
129         * @return
130         * @throws Exception
131         */
132        public static ResponseListener getRequestListener() throws Exception {
133                if (rListener == null) {
134                        throw new Exception("Request listener not initialized");
135                }
136                return rListener;
137        }
138
139        /**
140         *
141         * @param key
142         * @return whether the map has the param key
143         */
144        public boolean containsKey(String key) {
145                return results.containsKey(key);
146        }
147
148        /**
149         * This method is used to kill the unique responseListener in the system
150         *
151         * @throws Exception
152         */
153        public static void stopResponseListner() throws Exception {
154                rListener.kill();
155                rListener = null;
156        }
157
158        // Revisar això
159        public void registerProxy(Proxymq proxy) {
160                if (!results.containsKey(proxy.getRef())) {
161                        results.put(proxy.getRef(), proxy.getResults());
162                }
163        }
164}
Note: See TracBrowser for help on using the repository browser.