source: trunk/src/main/java/omq/common/event/EventDispatcher.java @ 53

Last change on this file since 53 was 53, checked in by stoda, 11 years ago

Non static broker
TODO: change all test to see whether the new broker configuration works

File size: 5.2 KB
Line 
1package omq.common.event;
2
3import java.util.HashMap;
4import java.util.Map;
5import java.util.Properties;
6import java.util.Vector;
7
8import omq.common.broker.Broker;
9import omq.common.util.ParameterQueue;
10import omq.common.util.Serializer;
11
12import org.apache.log4j.Logger;
13
14import com.rabbitmq.client.Channel;
15import com.rabbitmq.client.ConsumerCancelledException;
16import com.rabbitmq.client.QueueingConsumer;
17import com.rabbitmq.client.QueueingConsumer.Delivery;
18import com.rabbitmq.client.ShutdownSignalException;
19
20/**
21 * This class dispatches the events received in the client side and stores them
22 * into the different listeners that could exists among the different proxies
23 * generated
24 *
25 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
26 *
27 */
28@SuppressWarnings("rawtypes")
29public class EventDispatcher extends Thread {
30        private static final Logger logger = Logger.getLogger(EventDispatcher.class.getName());
31
32        private Broker broker;
33        private Serializer serializer;
34        private Map<String, Vector<EventListener>> listeners;
35        private Channel channel;
36        private QueueingConsumer consumer;
37        private Properties env;
38        private boolean killed = false;
39
40        public EventDispatcher(Broker broker) throws Exception {
41                this.broker = broker;
42                env = broker.getEnvironment();
43                serializer = broker.getSerializer();
44
45                // Declare the listeners map
46                listeners = new HashMap<String, Vector<EventListener>>();
47
48                startEventQueue();
49        }
50
51        private void startEventQueue() throws Exception {
52                // Get a new connection and a new channel
53                channel = broker.getNewChannel();
54
55                String event_queue = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
56                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
57                channel.queueDeclare(event_queue, durable, false, false, null);
58
59                // Declare a new consumer
60                consumer = new QueueingConsumer(channel);
61                channel.basicConsume(event_queue, true, consumer);
62        }
63
64        public void kill() throws Exception {
65                logger.warn("Stopping EventDispatcher");
66                setListeners(null);
67                killed = true;
68                interrupt();
69                channel.close();
70        }
71
72        @Override
73        public void run() {
74                Delivery delivery;
75                Event event;
76
77                while (!killed) {
78                        try {
79                                // Get the delivery
80                                delivery = consumer.nextDelivery();
81
82                                // Get the event
83                                event = serializer.deserializeEvent(delivery.getBody());
84
85                                logger.info("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId());
86                                // Log.saveLog("Client-Deserialize", delivery.getBody());
87
88                                // long timeEnd = (new Date()).getTime();
89                                // Log.saveTimeSendRequestLog("Client-time-response",
90                                // event.getCorrId(), "Event!", timeEnd);
91
92                                // Dispatch it
93                                dispatch(event.getTopic(), event);
94                        } catch (InterruptedException i) {
95                                logger.error(i);
96                        } catch (ShutdownSignalException e) {
97                                logger.error(e);
98                                try {
99                                        if (channel.isOpen()) {
100                                                channel.close();
101                                        }
102                                        startEventQueue();
103                                } catch (Exception e1) {
104                                        try {
105                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
106                                                Thread.sleep(milis);
107                                        } catch (InterruptedException e2) {
108                                                logger.error(e2);
109                                        }
110                                        logger.error(e1);
111                                }
112                        } catch (ConsumerCancelledException e) {
113                                logger.error(e);
114                        } catch (Exception e) {
115                                logger.error(e);
116                        }
117                }
118        }
119
120        public int addListener(EventListener e) throws Exception {
121                // Map<String, ArrayList<EventListener<Event>>> mListeners =
122                // listeners.get(e.getTopic());
123                // if(mListeners == null){
124                // mListeners = new HashMap<String, ArrayList<EventListener<Event>>>();
125                //
126                // String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
127                // String reference = e.getTopic();
128                // channel.exchangeDeclare(reference, "fanout");
129                // channel.queueBind(queueName, reference, "");
130                // }
131
132                Vector<EventListener> vListeners = listeners.get(e.getTopic());
133                if (vListeners == null) {
134                        vListeners = new Vector<EventListener>();
135
136                        String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
137                        String reference = e.getTopic();
138
139                        logger.info("Declaring fanout -> " + reference + " Binding with: " + queueName);
140
141                        channel.exchangeDeclare(reference, "fanout");
142                        channel.queueBind(queueName, reference, "");
143                }
144                vListeners.add(e);
145                listeners.put(e.getTopic(), vListeners);
146
147                return vListeners.size();
148        }
149
150        public int removeListener(EventListener e) {
151                Vector<EventListener> vListeners = listeners.get(e.getTopic());
152                if (vListeners != null) {
153                        // TODO: removeListener -> remove(e) override equals?
154                        vListeners.remove(e);
155                }
156
157                return vListeners.size();
158        }
159
160        /**
161         * This method dispatches the events. Every time an event is received, this
162         * method is launched. This method creates a new thread and executes the
163         * notifyEvent function of the listeners associated to this event
164         *
165         * @param topic
166         * @param event
167         */
168        public void dispatch(String topic, final Event event) {
169                if (listeners.containsKey(topic)) {
170                        for (final EventListener listener : listeners.get(topic)) {
171                                new Thread() {
172                                        @SuppressWarnings("unchecked")
173                                        public void run() {
174                                                listener.notifyEvent(event);
175                                        }
176                                }.start();
177                        }
178                }
179        }
180
181        public Map<String, Vector<EventListener>> getListeners() {
182                return listeners;
183        }
184
185        public void setListeners(Map<String, Vector<EventListener>> listeners) {
186                this.listeners = listeners;
187        }
188
189}
Note: See TracBrowser for help on using the repository browser.