source: trunk/src/main/java/omq/common/event/EventDispatcher.java @ 49

Last change on this file since 49 was 49, checked in by stoda, 11 years ago

log4j added

File size: 5.9 KB
Line 
1package omq.common.event;
2
3import java.util.HashMap;
4import java.util.Map;
5import java.util.Properties;
6import java.util.Vector;
7
8import omq.common.broker.Broker;
9import omq.common.util.ParameterQueue;
10import omq.common.util.Serializer;
11
12import org.apache.log4j.Logger;
13
14import com.rabbitmq.client.Channel;
15import com.rabbitmq.client.ConsumerCancelledException;
16import com.rabbitmq.client.QueueingConsumer;
17import com.rabbitmq.client.QueueingConsumer.Delivery;
18import com.rabbitmq.client.ShutdownSignalException;
19
20/**
21 * This class dispatches the events received in the client side and stores them
22 * into the different listeners that could exists among the different proxies
23 * generated
24 *
25 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
26 *
27 */
28@SuppressWarnings("rawtypes")
29public class EventDispatcher extends Thread {
30        private static final Logger logger = Logger.getLogger(EventDispatcher.class.getName());
31        private static EventDispatcher dispatcher;
32
33        private Map<String, Vector<EventListener>> listeners;
34        private Channel channel;
35        private QueueingConsumer consumer;
36        private Properties env;
37        private boolean killed = false;
38
39        private EventDispatcher(Properties env) throws Exception {
40                this.env = env;
41
42                // Declare the listeners map
43                listeners = new HashMap<String, Vector<EventListener>>();
44
45                startEventQueue();
46
47        }
48
49        private void startEventQueue() throws Exception {
50                // Get a new connection and a new channel
51                channel = Broker.getNewChannel();
52
53                String event_queue = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
54                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
55                channel.queueDeclare(event_queue, durable, false, false, null);
56
57                // Declare a new consumer
58                consumer = new QueueingConsumer(channel);
59                channel.basicConsume(event_queue, true, consumer);
60        }
61
62        public static void init(Properties env) throws Exception {
63                if (dispatcher == null) {
64                        dispatcher = new EventDispatcher(env);
65                        dispatcher.start();
66                } else {
67                        throw new Exception("Already initialized");
68                }
69        }
70
71        public static void stopEventDispatcher() throws Exception {
72                logger.warn("Stopping EventDispatcher");
73                dispatcher.setListeners(null);
74                dispatcher.killed = true;
75                dispatcher.interrupt();
76                dispatcher.channel.close();
77                dispatcher = null;
78        }
79
80        public static EventDispatcher getDispatcher(Properties env) throws Exception {
81                if (dispatcher == null) {
82                        dispatcher = new EventDispatcher(env);
83                        dispatcher.start();
84                }
85                return dispatcher;
86        }
87
88        public static EventDispatcher getDispatcher() throws Exception {
89                if (dispatcher == null) {
90                        throw new Exception("EventDispatcher not initialized");
91                }
92                return dispatcher;
93        }
94
95        @Override
96        public void run() {
97                Delivery delivery;
98                Event event;
99
100                while (!killed) {
101                        try {
102                                // Get the delivery
103                                delivery = consumer.nextDelivery();
104
105                                // Get the event
106                                event = Serializer.deserializeEvent(delivery.getBody());
107
108                                logger.info("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId());
109                                // Log.saveLog("Client-Deserialize", delivery.getBody());
110
111                                // long timeEnd = (new Date()).getTime();
112                                // Log.saveTimeSendRequestLog("Client-time-response",
113                                // event.getCorrId(), "Event!", timeEnd);
114
115                                // Dispatch it
116                                dispatch(event.getTopic(), event);
117                        } catch (InterruptedException i) {
118                                logger.error(i);
119                        } catch (ShutdownSignalException e) {
120                                logger.error(e);
121                                try {
122                                        if (channel.isOpen()) {
123                                                channel.close();
124                                        }
125                                        startEventQueue();
126                                } catch (Exception e1) {
127                                        try {
128                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
129                                                Thread.sleep(milis);
130                                        } catch (InterruptedException e2) {
131                                                logger.error(e2);
132                                        }
133                                        logger.error(e1);
134                                }
135                        } catch (ConsumerCancelledException e) {
136                                logger.error(e);
137                        } catch (Exception e) {
138                                logger.error(e);
139                        }
140                }
141        }
142
143        public int addListener(EventListener e) throws Exception {
144                // Map<String, ArrayList<EventListener<Event>>> mListeners =
145                // listeners.get(e.getTopic());
146                // if(mListeners == null){
147                // mListeners = new HashMap<String, ArrayList<EventListener<Event>>>();
148                //
149                // String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
150                // String reference = e.getTopic();
151                // channel.exchangeDeclare(reference, "fanout");
152                // channel.queueBind(queueName, reference, "");
153                // }
154
155                Vector<EventListener> vListeners = listeners.get(e.getTopic());
156                if (vListeners == null) {
157                        vListeners = new Vector<EventListener>();
158
159                        String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
160                        String reference = e.getTopic();
161
162                        logger.info("Declaring fanout -> " + reference + " Binding with: " + queueName);
163
164                        channel.exchangeDeclare(reference, "fanout");
165                        channel.queueBind(queueName, reference, "");
166                }
167                vListeners.add(e);
168                listeners.put(e.getTopic(), vListeners);
169
170                return vListeners.size();
171        }
172
173        public int removeListener(EventListener e) {
174                Vector<EventListener> vListeners = listeners.get(e.getTopic());
175                if (vListeners != null) {
176                        // TODO: removeListener -> remove(e) override equals?
177                        vListeners.remove(e);
178                }
179
180                return vListeners.size();
181        }
182
183        /**
184         * This method dispatches the events. Every time an event is received, this
185         * method is launched. This method creates a new thread and executes the
186         * notifyEvent function of the listeners associated to this event
187         *
188         * @param topic
189         * @param event
190         */
191        public void dispatch(String topic, final Event event) {
192                if (listeners.containsKey(topic)) {
193                        for (final EventListener listener : listeners.get(topic)) {
194                                new Thread() {
195                                        @SuppressWarnings("unchecked")
196                                        public void run() {
197                                                listener.notifyEvent(event);
198                                        }
199                                }.start();
200                        }
201                }
202        }
203
204        public Map<String, Vector<EventListener>> getListeners() {
205                return listeners;
206        }
207
208        public void setListeners(Map<String, Vector<EventListener>> listeners) {
209                this.listeners = listeners;
210        }
211
212        public static boolean isVoid() {
213                return dispatcher == null;
214        }
215
216}
Note: See TracBrowser for help on using the repository browser.