source: branches/objectmq-1.0/src/omq/common/event/EventDispatcher.java

Last change on this file was 33, checked in by amoreno, 11 years ago

new release version

File size: 5.4 KB
Line 
1package omq.common.event;
2
3import java.util.HashMap;
4import java.util.Map;
5import java.util.Properties;
6import java.util.Vector;
7
8import omq.common.broker.Broker;
9import omq.common.util.ParameterQueue;
10import omq.common.util.Serializer;
11
12import com.rabbitmq.client.Channel;
13import com.rabbitmq.client.ConsumerCancelledException;
14import com.rabbitmq.client.QueueingConsumer;
15import com.rabbitmq.client.QueueingConsumer.Delivery;
16import com.rabbitmq.client.ShutdownSignalException;
17
18/**
19 * This class dispatches the events received in the client side and stores them
20 * into the different listeners that could exists among the different proxies
21 * generated
22 *
23 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
24 *
25 */
26public class EventDispatcher extends Thread {
27        private static EventDispatcher dispatcher;
28
29        private Map<String, Vector<EventListener>> listeners;
30        private Channel channel;
31        private QueueingConsumer consumer;
32        private Properties env;
33        private boolean killed = false;
34
35        private EventDispatcher(Properties env) throws Exception {
36                this.env = env;
37
38                // Declare the listeners map
39                listeners = new HashMap<String, Vector<EventListener>>();
40
41                // Get a new connection and a new channel
42                channel = Broker.getNewChannel();
43
44                String event_queue = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
45                channel.queueDeclare(event_queue, false, false, false, null);
46
47                // Declare a new consumer
48                consumer = new QueueingConsumer(channel);
49                channel.basicConsume(event_queue, true, consumer);
50        }
51
52        public static void init(Properties env) throws Exception {
53                if (dispatcher == null) {
54                        dispatcher = new EventDispatcher(env);
55                        dispatcher.start();
56                } else {
57                        throw new Exception("Already initialized");
58                }
59        }
60
61        public static void stopEventDispatcher() throws Exception {
62                dispatcher.setListeners(null);
63                dispatcher.killed = true;
64                dispatcher.interrupt();
65                dispatcher.channel.close();
66                dispatcher = null;
67        }
68
69        public static EventDispatcher getDispatcher(Properties env) throws Exception {
70                if (dispatcher == null) {
71                        dispatcher = new EventDispatcher(env);
72                        dispatcher.start();
73                }
74                return dispatcher;
75        }
76
77        public static EventDispatcher getDispatcher() throws Exception {
78                if (dispatcher == null) {
79                        throw new Exception("EventDispatcher not initialized");
80                }
81                return dispatcher;
82        }
83
84        @Override
85        public void run() {
86                Delivery delivery;
87                Event event;
88
89                while (!killed) {
90                        try {
91                                // Get the delivery
92                                delivery = consumer.nextDelivery();
93
94                                // Get the event
95                                event = Serializer.deserializeEvent(delivery.getBody());
96
97                                System.out.println("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId());
98                                //Log.saveLog("Client-Deserialize", delivery.getBody());
99                               
100                                //long timeEnd = (new Date()).getTime();
101                                //Log.saveTimeSendRequestLog("Client-time-response", event.getCorrId(), "Event!",  timeEnd);
102                               
103                                // Dispatch it
104                                dispatch(event.getTopic(), event);
105                        } catch (InterruptedException i) {
106                                System.out.println("InterruptedException e: " + i);
107                                i.printStackTrace();
108                        } catch (ShutdownSignalException e) {
109                                System.out.println("ShutdownSignalException e: " + e);
110                                e.printStackTrace();
111                        } catch (ConsumerCancelledException e) {
112                                System.out.println("ConsumerCancelledException e: " + e);
113                                e.printStackTrace();
114                        } catch (Exception e) {
115                                System.out.println("Exception e: " + e);
116                                e.printStackTrace();
117                        }
118                }
119        }
120
121        public int addListener(EventListener e) throws Exception {
122                // Map<String, ArrayList<EventListener<Event>>> mListeners =
123                // listeners.get(e.getTopic());
124                // if(mListeners == null){
125                // mListeners = new HashMap<String, ArrayList<EventListener<Event>>>();
126                //
127                // String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
128                // String reference = e.getTopic();
129                // channel.exchangeDeclare(reference, "fanout");
130                // channel.queueBind(queueName, reference, "");
131                // }
132
133                Vector<EventListener> vListeners = listeners.get(e.getTopic());
134                if (vListeners == null) {
135                        vListeners = new Vector<EventListener>();
136
137                        String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
138                        String reference = e.getTopic();
139
140                        System.out.println("EventDispatcher declaring fanout -> " + reference + " Binding with: " + queueName);
141
142                        channel.exchangeDeclare(reference, "fanout");
143                        channel.queueBind(queueName, reference, "");
144                }
145                vListeners.add(e);
146                listeners.put(e.getTopic(), vListeners);
147
148                return vListeners.size();
149        }
150
151        public int removeListener(EventListener e) {
152                Vector<EventListener> vListeners = listeners.get(e.getTopic());
153                if (vListeners != null) {
154                        // TODO: removeListener -> remove(e) override equals?
155                        vListeners.remove(e);
156                }
157
158                return vListeners.size();
159        }
160
161        /**
162         * This method dispatches the events. Every time an event is received, this
163         * method is launched. This method creates a new thread and executes the
164         * notifyEvent function of the listeners associated to this event
165         *
166         * @param topic
167         * @param event
168         */
169        public void dispatch(String topic, final Event event) {
170                if (listeners.containsKey(topic)) {
171                        for (final EventListener listener : listeners.get(topic)) {
172                                new Thread() {
173                                        public void run() {
174                                                listener.notifyEvent(event);
175                                        }
176                                }.start();
177                        }
178                }
179        }
180
181        public Map<String, Vector<EventListener>> getListeners() {
182                return listeners;
183        }
184
185        public void setListeners(Map<String, Vector<EventListener>> listeners) {
186                this.listeners = listeners;
187        }
188
189        public static boolean isVoid() {
190                return dispatcher == null;
191        }
192
193}
Note: See TracBrowser for help on using the repository browser.