source: branches/objectmq_old/src/omq/client/listener/ResponseListener.java

Last change on this file was 36, checked in by stoda, 11 years ago

Fault tolerance in the client side added.
Some refactoring in proxymq, broker, environment and the client listeners

File size: 5.4 KB
Line 
1package omq.client.listener;
2
3import java.io.IOException;
4import java.util.HashMap;
5import java.util.Hashtable;
6import java.util.Map;
7import java.util.Properties;
8
9import omq.client.proxy.Proxymq;
10import omq.common.broker.Broker;
11import omq.common.util.ParameterQueue;
12
13import com.rabbitmq.client.AMQP.BasicProperties;
14import com.rabbitmq.client.Channel;
15import com.rabbitmq.client.ConsumerCancelledException;
16import com.rabbitmq.client.QueueingConsumer;
17import com.rabbitmq.client.QueueingConsumer.Delivery;
18import com.rabbitmq.client.ShutdownSignalException;
19
20/**
21 * Class that inherits from RemoteListener. It's used in the server side. This
22 * class gets the deliveries from the server and stores them into the proxies
23 *
24 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
25 *
26 */
27public class ResponseListener extends Thread {
28        private static ResponseListener rListener;
29
30        private Channel channel;
31        private QueueingConsumer consumer;
32        private boolean killed = false;
33        private Map<String, Map<String, byte[]>> results;
34        private Properties env;
35
36        /**
37         * Protected constructor used by the singleton pattern
38         *
39         * @param env
40         * @throws Exception
41         */
42        protected ResponseListener(Properties env) throws Exception {
43                this.env = env;
44
45                // Init the hashtable (it's concurrent)
46                this.results = new Hashtable<String, Map<String, byte[]>>();
47
48                startRPCQueue();
49        }
50
51        @Override
52        public void run() {
53                Delivery delivery;
54                String uid_request;
55
56                while (!killed) {
57                        try {
58                                // Get the delivery
59
60                                delivery = consumer.nextDelivery();
61
62                                BasicProperties props = delivery.getProperties();
63
64                                // Get the response with its uid
65                                uid_request = delivery.getProperties().getCorrelationId();
66                                System.out.println("Response received -> " + uid_request);
67
68                                // Stores the new response
69                                Map<String, byte[]> proxyResults = results.get(props.getAppId());
70
71                                // Put the result into the proxy results and notify him
72                                synchronized (proxyResults) {
73                                        // If we haven't received this response before, we store it
74                                        if (!proxyResults.containsKey(uid_request)) {
75                                                proxyResults.put(uid_request, delivery.getBody());
76                                                proxyResults.notifyAll();
77                                        }
78                                }
79                        } catch (InterruptedException i) {
80                                i.printStackTrace();
81                        } catch (ShutdownSignalException e) {
82                                e.printStackTrace();
83                                try {
84                                        if (channel.isOpen()) {
85                                                channel.close();
86                                        }
87                                        startRPCQueue();
88                                } catch (Exception e1) {
89                                        e1.printStackTrace();
90                                        try {
91                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
92                                                Thread.sleep(milis);
93                                        } catch (InterruptedException e2) {
94                                                e2.printStackTrace();
95                                        }
96                                }
97                        } catch (ConsumerCancelledException e) {
98                                e.printStackTrace();
99                        } catch (Exception e) {
100                                e.printStackTrace();
101                        }
102                }
103        }
104
105        private void startRPCQueue() throws Exception {
106                channel = Broker.getNewChannel();
107
108                Map<String, Object> args = null;
109
110                String reply_queue = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
111                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
112
113                int ttl = Integer.parseInt(env.getProperty(ParameterQueue.MESSAGE_TTL_IN_QUEUES, "-1"));
114                if (ttl > 0) {
115                        args = new HashMap<String, Object>();
116                        args.put("x-message-ttl", ttl);
117                }
118
119                channel.queueDeclare(reply_queue, durable, false, false, args);
120
121                // Declare a new consumer
122                consumer = new QueueingConsumer(channel);
123                channel.basicConsume(reply_queue, true, consumer);
124        }
125
126        /**
127         * Static function which initializes the ResponseListener
128         *
129         * @param env
130         * @throws Exception
131         */
132        public static void init(Properties env) throws Exception {
133                if (rListener == null) {
134                        rListener = new ResponseListener(env);
135                        rListener.start();
136                } else {
137                        throw new Exception("Cannot init because it already exists");
138                }
139        }
140
141        /**
142         * Method to retrieve the unique ResponseListener, this function can also
143         * initialize a ResponseListener using and environment
144         *
145         * @param env
146         * @return unique ResponseListener
147         * @throws Exception
148         */
149        public static ResponseListener getRequestListener(Properties env) throws Exception {
150                if (rListener == null) {
151                        rListener = new ResponseListener(env);
152                        rListener.start();
153                } else {
154                        // TODO: create a new exception to indicate that a response listener
155                        // cannot be init
156                        throw new Exception("Cannot init because it already exists");
157                }
158                return rListener;
159        }
160
161        public static boolean isVoid() {
162                return rListener == null;
163        }
164
165        /**
166         * Method to retrieve the unique ResponseListener
167         *
168         * @return
169         * @throws Exception
170         */
171        public static ResponseListener getRequestListener() throws Exception {
172                if (rListener == null) {
173                        throw new Exception("Request listener not initialized");
174                }
175                return rListener;
176        }
177
178        /**
179         *
180         * @param key
181         * @return whether the map has the param key
182         */
183        public boolean containsKey(String key) {
184                return results.containsKey(key);
185        }
186
187        /**
188         * This method is used to kill the unique responseListener in the system
189         *
190         * @throws Exception
191         */
192        public static void stopResponseListner() throws Exception {
193                rListener.kill();
194                rListener = null;
195        }
196
197        /**
198         * Interrupt and kill the Thread
199         *
200         * @throws IOException
201         */
202        public void kill() throws IOException {
203                interrupt();
204                killed = true;
205                channel.close();
206        }
207
208        // Revisar això
209        public void registerProxy(Proxymq proxy) {
210                if (!results.containsKey(proxy.getRef())) {
211                        results.put(proxy.getRef(), proxy.getResults());
212                }
213        }
214}
Note: See TracBrowser for help on using the repository browser.