source: trunk/src/main/java/omq/client/listener/ResponseListener.java @ 84

Last change on this file since 84 was 84, checked in by stoda, 11 years ago

Default queues added, default exchange enabled, more control in remote queues added.
Tests verified and changed Persistent test to show how to make persistent messages.

File size: 5.0 KB
Line 
1package omq.client.listener;
2
3import java.io.IOException;
4import java.util.HashMap;
5import java.util.Hashtable;
6import java.util.Map;
7import java.util.Properties;
8
9import org.apache.log4j.Logger;
10
11import omq.client.proxy.Proxymq;
12import omq.common.broker.Broker;
13import omq.common.util.ParameterQueue;
14
15import com.rabbitmq.client.AMQP.BasicProperties;
16import com.rabbitmq.client.Channel;
17import com.rabbitmq.client.ConsumerCancelledException;
18import com.rabbitmq.client.QueueingConsumer;
19import com.rabbitmq.client.QueueingConsumer.Delivery;
20import com.rabbitmq.client.ShutdownSignalException;
21
22/**
23 * Class that inherits from Thread. It's used in the client side. This class
24 * gets the deliveries from the server and stores them into the different
25 * proxies created.
26 *
27 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
28 *
29 */
30public class ResponseListener extends Thread {
31        private final Logger logger = Logger.getLogger(ResponseListener.class.getName());
32
33        private Broker broker;
34        private Channel channel;
35        private QueueingConsumer consumer;
36        private boolean killed = false;
37        private Map<String, Map<String, byte[]>> results;
38        private Properties env;
39
40        /**
41         * ResponseListener constructor
42         *
43         * @param broker
44         * @throws Exception
45         */
46        public ResponseListener(Broker broker) throws Exception {
47                this.broker = broker;
48                env = broker.getEnvironment();
49
50                // Init the hashtable (it's concurrent)
51                results = new Hashtable<String, Map<String, byte[]>>();
52
53                startRPCQueue();
54        }
55
56        @Override
57        public void run() {
58                logger.info("ResponseListener started");
59                Delivery delivery;
60                String uid_request;
61
62                while (!killed) {
63                        try {
64                                // Get the delivery
65
66                                delivery = consumer.nextDelivery();
67
68                                BasicProperties props = delivery.getProperties();
69
70                                // Get the response with its uid
71                                uid_request = delivery.getProperties().getCorrelationId();
72                                logger.debug("Response received -> proxy reference: " + props.getAppId() + ", corrId: " + uid_request);
73
74                                // Stores the new response
75                                Map<String, byte[]> proxyResults = results.get(props.getAppId());
76
77                                // Put the result into the proxy results and notify him
78                                synchronized (proxyResults) {
79                                        // If we haven't received this response before, we store it
80                                        if (!proxyResults.containsKey(uid_request)) {
81                                                proxyResults.put(uid_request, delivery.getBody());
82                                                proxyResults.notifyAll();
83                                        }
84                                }
85                        } catch (InterruptedException i) {
86                                logger.error(i.toString(), i);
87                        } catch (ShutdownSignalException e) {
88                                logger.error(e.toString(), e);
89                                try {
90                                        if (channel.isOpen()) {
91                                                channel.close();
92                                        }
93                                        startRPCQueue();
94                                } catch (Exception e1) {
95                                        e1.printStackTrace();
96                                        try {
97                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
98                                                Thread.sleep(milis);
99                                        } catch (InterruptedException e2) {
100                                                logger.error(e2.toString(), e2);
101                                        }
102                                }
103                        } catch (ConsumerCancelledException e) {
104                                logger.error(e.toString(), e);
105                        } catch (Exception e) {
106                                logger.error(e.toString(), e);
107                        }
108                }
109        }
110
111        /**
112         * This function is used to start the response client queue
113         *
114         * @throws Exception
115         */
116        private void startRPCQueue() throws Exception {
117                channel = broker.getNewChannel();
118
119                Map<String, Object> args = null;
120
121                String reply_queue = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
122                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
123                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "true"));
124                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "true"));
125
126                int ttl = Integer.parseInt(env.getProperty(ParameterQueue.MESSAGE_TTL_IN_QUEUES, "-1"));
127                if (ttl > 0) {
128                        args = new HashMap<String, Object>();
129                        args.put("x-message-ttl", ttl);
130                }
131
132                if (reply_queue == null) {
133                        reply_queue = channel.queueDeclare().getQueue();
134                        env.setProperty(ParameterQueue.RPC_REPLY_QUEUE, reply_queue);
135                } else {
136                        channel.queueDeclare(reply_queue, durable, exclusive, autoDelete, args);
137                }
138                logger.info("ResponseListener creating queue: " + reply_queue + ", durable: " + durable + ", exclusive: " + exclusive + ", autoDelete: " + autoDelete
139                                + ", TTL: " + (ttl > 0 ? ttl : "not set"));
140
141                // Declare a new consumer
142                consumer = new QueueingConsumer(channel);
143                channel.basicConsume(reply_queue, true, consumer);
144        }
145
146        /**
147         *
148         * @param key
149         * @return whether the map has the param key
150         */
151        public boolean containsKey(String key) {
152                return results.containsKey(key);
153        }
154
155        /**
156         * Interrupt and kill the Thread
157         *
158         * @throws IOException
159         */
160        public void kill() throws IOException {
161                logger.warn("Killing ResponseListener");
162                interrupt();
163                killed = true;
164                channel.close();
165        }
166
167        /**
168         * This method registers a new proxy into this responseListener
169         *
170         * @param proxy
171         */
172        public void registerProxy(Proxymq proxy) {
173                // Since results is a hashtable this method doesn't need to be
174                // synchronized
175                if (!results.containsKey(proxy.getRef())) {
176                        results.put(proxy.getRef(), proxy.getResults());
177                }
178        }
179}
Note: See TracBrowser for help on using the repository browser.