source: trunk/src/main/java/omq/common/event/EventDispatcher.java @ 54

Last change on this file since 54 was 54, checked in by stoda, 11 years ago

Adding @MultiMethod?
Broker is not a singleton.

File size: 5.2 KB
Line 
1package omq.common.event;
2
3import java.util.HashMap;
4import java.util.Map;
5import java.util.Properties;
6import java.util.Vector;
7
8import omq.common.broker.Broker;
9import omq.common.util.ParameterQueue;
10import omq.common.util.Serializer;
11
12import org.apache.log4j.Logger;
13
14import com.rabbitmq.client.Channel;
15import com.rabbitmq.client.ConsumerCancelledException;
16import com.rabbitmq.client.QueueingConsumer;
17import com.rabbitmq.client.QueueingConsumer.Delivery;
18import com.rabbitmq.client.ShutdownSignalException;
19
20/**
21 * This class dispatches the events received in the client side and stores them
22 * into the different listeners that could exists among the different proxies
23 * generated
24 *
25 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
26 *
27 */
28@SuppressWarnings("rawtypes")
29public class EventDispatcher extends Thread {
30        private static final Logger logger = Logger.getLogger(EventDispatcher.class.getName());
31
32        private Broker broker;
33        private Serializer serializer;
34        private Map<String, Vector<EventListener>> listeners;
35        private Channel channel;
36        private QueueingConsumer consumer;
37        private Properties env;
38        private boolean killed = false;
39
40        public EventDispatcher(Broker broker) throws Exception {
41                this.broker = broker;
42                env = broker.getEnvironment();
43                serializer = broker.getSerializer();
44
45                // Declare the listeners map
46                listeners = new HashMap<String, Vector<EventListener>>();
47
48                startEventQueue();
49        }
50
51        private void startEventQueue() throws Exception {
52                // Get a new connection and a new channel
53                channel = broker.getNewChannel();
54
55                String event_queue = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
56                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
57                channel.queueDeclare(event_queue, durable, false, false, null);
58                logger.info("EventDispatcher creating queue: " + event_queue + ", durable: " + durable);
59
60                // Declare a new consumer
61                consumer = new QueueingConsumer(channel);
62                channel.basicConsume(event_queue, true, consumer);
63        }
64
65        public void kill() throws Exception {
66                logger.warn("Stopping EventDispatcher");
67                setListeners(null);
68                killed = true;
69                interrupt();
70                channel.close();
71        }
72
73        @Override
74        public void run() {
75                logger.info("EventDispatcher started");
76                Delivery delivery;
77                Event event;
78
79                while (!killed) {
80                        try {
81                                // Get the delivery
82                                delivery = consumer.nextDelivery();
83
84                                // Get the event
85                                event = serializer.deserializeEvent(delivery.getBody());
86
87                                logger.info("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId());
88
89                                // Dispatch it
90                                dispatch(event.getTopic(), event);
91                        } catch (InterruptedException i) {
92                                logger.error(i);
93                        } catch (ShutdownSignalException e) {
94                                logger.error(e);
95                                try {
96                                        if (channel.isOpen()) {
97                                                channel.close();
98                                        }
99                                        startEventQueue();
100                                } catch (Exception e1) {
101                                        try {
102                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
103                                                Thread.sleep(milis);
104                                        } catch (InterruptedException e2) {
105                                                logger.error(e2);
106                                        }
107                                        logger.error(e1);
108                                }
109                        } catch (ConsumerCancelledException e) {
110                                logger.error(e);
111                        } catch (Exception e) {
112                                logger.error(e);
113                        }
114                }
115        }
116
117        public int addListener(EventListener e) throws Exception {
118                // Map<String, ArrayList<EventListener<Event>>> mListeners =
119                // listeners.get(e.getTopic());
120                // if(mListeners == null){
121                // mListeners = new HashMap<String, ArrayList<EventListener<Event>>>();
122                //
123                // String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
124                // String reference = e.getTopic();
125                // channel.exchangeDeclare(reference, "fanout");
126                // channel.queueBind(queueName, reference, "");
127                // }
128
129                Vector<EventListener> vListeners = listeners.get(e.getTopic());
130                if (vListeners == null) {
131                        vListeners = new Vector<EventListener>();
132
133                        String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
134                        String reference = e.getTopic();
135
136                        logger.info("Declaring fanout -> " + reference + " Binding with: " + queueName);
137
138                        channel.exchangeDeclare(reference, "fanout");
139                        channel.queueBind(queueName, reference, "");
140                }
141                vListeners.add(e);
142                listeners.put(e.getTopic(), vListeners);
143
144                return vListeners.size();
145        }
146
147        public int removeListener(EventListener e) {
148                Vector<EventListener> vListeners = listeners.get(e.getTopic());
149                if (vListeners != null) {
150                        // TODO: removeListener -> remove(e) override equals?
151                        vListeners.remove(e);
152                }
153
154                return vListeners.size();
155        }
156
157        /**
158         * This method dispatches the events. Every time an event is received, this
159         * method is launched. This method creates a new thread and executes the
160         * notifyEvent function of the listeners associated to this event
161         *
162         * @param topic
163         * @param event
164         */
165        public void dispatch(String topic, final Event event) {
166                if (listeners.containsKey(topic)) {
167                        for (final EventListener listener : listeners.get(topic)) {
168                                new Thread() {
169                                        @SuppressWarnings("unchecked")
170                                        public void run() {
171                                                listener.notifyEvent(event);
172                                        }
173                                }.start();
174                        }
175                }
176        }
177
178        public Map<String, Vector<EventListener>> getListeners() {
179                return listeners;
180        }
181
182        public void setListeners(Map<String, Vector<EventListener>> listeners) {
183                this.listeners = listeners;
184        }
185
186}
Note: See TracBrowser for help on using the repository browser.