source: trunk/src/main/java/omq/client/listener/ResponseListener.java @ 50

Last change on this file since 50 was 50, checked in by stoda, 11 years ago
File size: 5.6 KB
Line 
1package omq.client.listener;
2
3import java.io.IOException;
4import java.util.HashMap;
5import java.util.Hashtable;
6import java.util.Map;
7import java.util.Properties;
8
9import org.apache.log4j.Logger;
10
11import omq.client.proxy.Proxymq;
12import omq.common.broker.Broker;
13import omq.common.util.ParameterQueue;
14
15import com.rabbitmq.client.AMQP.BasicProperties;
16import com.rabbitmq.client.Channel;
17import com.rabbitmq.client.ConsumerCancelledException;
18import com.rabbitmq.client.QueueingConsumer;
19import com.rabbitmq.client.QueueingConsumer.Delivery;
20import com.rabbitmq.client.ShutdownSignalException;
21
22/**
23 * Class that inherits from RemoteListener. It's used in the server side. This
24 * class gets the deliveries from the server and stores them into the proxies
25 *
26 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
27 *
28 */
29public class ResponseListener extends Thread {
30        private static final Logger logger = Logger.getLogger(ResponseListener.class.getName());
31        private static ResponseListener rListener;
32
33        private Channel channel;
34        private QueueingConsumer consumer;
35        private boolean killed = false;
36        private Map<String, Map<String, byte[]>> results;
37        private Properties env;
38
39        /**
40         * Protected constructor used by the singleton pattern
41         *
42         * @param env
43         * @throws Exception
44         */
45        protected ResponseListener(Properties env) throws Exception {
46                this.env = env;
47
48                // Init the hashtable (it's concurrent)
49                this.results = new Hashtable<String, Map<String, byte[]>>();
50
51                startRPCQueue();
52        }
53
54        @Override
55        public void run() {
56                Delivery delivery;
57                String uid_request;
58
59                while (!killed) {
60                        try {
61                                // Get the delivery
62
63                                delivery = consumer.nextDelivery();
64
65                                BasicProperties props = delivery.getProperties();
66
67                                // Get the response with its uid
68                                uid_request = delivery.getProperties().getCorrelationId();
69                                logger.debug("Response received -> proxy reference: " + props.getAppId() + ", corrId: " + uid_request);
70
71                                // Stores the new response
72                                Map<String, byte[]> proxyResults = results.get(props.getAppId());
73
74                                // Put the result into the proxy results and notify him
75                                synchronized (proxyResults) {
76                                        // If we haven't received this response before, we store it
77                                        if (!proxyResults.containsKey(uid_request)) {
78                                                proxyResults.put(uid_request, delivery.getBody());
79                                                proxyResults.notifyAll();
80                                        }
81                                }
82                        } catch (InterruptedException i) {
83                                logger.error(i.toString(), i);
84                        } catch (ShutdownSignalException e) {
85                                logger.error(e.toString(), e);
86                                try {
87                                        if (channel.isOpen()) {
88                                                channel.close();
89                                        }
90                                        startRPCQueue();
91                                } catch (Exception e1) {
92                                        e1.printStackTrace();
93                                        try {
94                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
95                                                Thread.sleep(milis);
96                                        } catch (InterruptedException e2) {
97                                                logger.error(e2.toString(), e2);
98                                        }
99                                }
100                        } catch (ConsumerCancelledException e) {
101                                logger.error(e.toString(), e);
102                        } catch (Exception e) {
103                                logger.error(e.toString(), e);
104                        }
105                }
106        }
107
108        private void startRPCQueue() throws Exception {
109                channel = Broker.getNewChannel();
110
111                Map<String, Object> args = null;
112
113                String reply_queue = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
114                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
115
116                int ttl = Integer.parseInt(env.getProperty(ParameterQueue.MESSAGE_TTL_IN_QUEUES, "-1"));
117                if (ttl > 0) {
118                        args = new HashMap<String, Object>();
119                        args.put("x-message-ttl", ttl);
120                }
121
122                channel.queueDeclare(reply_queue, durable, false, false, args);
123
124                // Declare a new consumer
125                consumer = new QueueingConsumer(channel);
126                channel.basicConsume(reply_queue, true, consumer);
127        }
128
129        /**
130         * Static function which initializes the ResponseListener
131         *
132         * @param env
133         * @throws Exception
134         */
135        public static void init(Properties env) throws Exception {
136                if (rListener == null) {
137                        rListener = new ResponseListener(env);
138                        rListener.start();
139                } else {
140                        throw new Exception("Cannot init because it already exists");
141                }
142        }
143
144        /**
145         * Method to retrieve the unique ResponseListener, this function can also
146         * initialize a ResponseListener using and environment
147         *
148         * @param env
149         * @return unique ResponseListener
150         * @throws Exception
151         */
152        public static ResponseListener getRequestListener(Properties env) throws Exception {
153                if (rListener == null) {
154                        rListener = new ResponseListener(env);
155                        rListener.start();
156                } else {
157                        // TODO: create a new exception to indicate that a response listener
158                        // cannot be init
159                        throw new Exception("Cannot init because it already exists");
160                }
161                return rListener;
162        }
163
164        public static boolean isVoid() {
165                return rListener == null;
166        }
167
168        /**
169         * Method to retrieve the unique ResponseListener
170         *
171         * @return
172         * @throws Exception
173         */
174        public static ResponseListener getRequestListener() throws Exception {
175                if (rListener == null) {
176                        throw new Exception("Request listener not initialized");
177                }
178                return rListener;
179        }
180
181        /**
182         *
183         * @param key
184         * @return whether the map has the param key
185         */
186        public boolean containsKey(String key) {
187                return results.containsKey(key);
188        }
189
190        /**
191         * This method is used to kill the unique responseListener in the system
192         *
193         * @throws Exception
194         */
195        public static void stopResponseListner() throws Exception {
196                rListener.kill();
197                rListener = null;
198        }
199
200        /**
201         * Interrupt and kill the Thread
202         *
203         * @throws IOException
204         */
205        public void kill() throws IOException {
206                logger.warn("Killing ResponseListener");
207                interrupt();
208                killed = true;
209                channel.close();
210        }
211
212        // Revisar això
213        public void registerProxy(Proxymq proxy) {
214                if (!results.containsKey(proxy.getRef())) {
215                        results.put(proxy.getRef(), proxy.getResults());
216                }
217        }
218}
Note: See TracBrowser for help on using the repository browser.