source: trunk/src/main/java/omq/common/event/EventDispatcher.java @ 47

Last change on this file since 47 was 44, checked in by stoda, 11 years ago

Objectmq converted to maven project

File size: 6.0 KB
Line 
1package omq.common.event;
2
3import java.util.HashMap;
4import java.util.Map;
5import java.util.Properties;
6import java.util.Vector;
7
8import omq.common.broker.Broker;
9import omq.common.util.ParameterQueue;
10import omq.common.util.Serializer;
11
12import com.rabbitmq.client.Channel;
13import com.rabbitmq.client.ConsumerCancelledException;
14import com.rabbitmq.client.QueueingConsumer;
15import com.rabbitmq.client.QueueingConsumer.Delivery;
16import com.rabbitmq.client.ShutdownSignalException;
17
18/**
19 * This class dispatches the events received in the client side and stores them
20 * into the different listeners that could exists among the different proxies
21 * generated
22 *
23 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
24 *
25 */
26@SuppressWarnings("rawtypes")
27public class EventDispatcher extends Thread {
28        private static EventDispatcher dispatcher;
29
30        private Map<String, Vector<EventListener>> listeners;
31        private Channel channel;
32        private QueueingConsumer consumer;
33        private Properties env;
34        private boolean killed = false;
35
36        private EventDispatcher(Properties env) throws Exception {
37                this.env = env;
38
39                // Declare the listeners map
40                listeners = new HashMap<String, Vector<EventListener>>();
41
42                startEventQueue();
43
44        }
45
46        private void startEventQueue() throws Exception {
47                // Get a new connection and a new channel
48                channel = Broker.getNewChannel();
49
50                String event_queue = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
51                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
52                channel.queueDeclare(event_queue, durable, false, false, null);
53
54                // Declare a new consumer
55                consumer = new QueueingConsumer(channel);
56                channel.basicConsume(event_queue, true, consumer);
57        }
58
59        public static void init(Properties env) throws Exception {
60                if (dispatcher == null) {
61                        dispatcher = new EventDispatcher(env);
62                        dispatcher.start();
63                } else {
64                        throw new Exception("Already initialized");
65                }
66        }
67
68        public static void stopEventDispatcher() throws Exception {
69                dispatcher.setListeners(null);
70                dispatcher.killed = true;
71                dispatcher.interrupt();
72                dispatcher.channel.close();
73                dispatcher = null;
74        }
75
76        public static EventDispatcher getDispatcher(Properties env) throws Exception {
77                if (dispatcher == null) {
78                        dispatcher = new EventDispatcher(env);
79                        dispatcher.start();
80                }
81                return dispatcher;
82        }
83
84        public static EventDispatcher getDispatcher() throws Exception {
85                if (dispatcher == null) {
86                        throw new Exception("EventDispatcher not initialized");
87                }
88                return dispatcher;
89        }
90
91        @Override
92        public void run() {
93                Delivery delivery;
94                Event event;
95
96                while (!killed) {
97                        try {
98                                // Get the delivery
99                                delivery = consumer.nextDelivery();
100
101                                // Get the event
102                                event = Serializer.deserializeEvent(delivery.getBody());
103
104                                System.out.println("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId());
105                                // Log.saveLog("Client-Deserialize", delivery.getBody());
106
107                                // long timeEnd = (new Date()).getTime();
108                                // Log.saveTimeSendRequestLog("Client-time-response",
109                                // event.getCorrId(), "Event!", timeEnd);
110
111                                // Dispatch it
112                                dispatch(event.getTopic(), event);
113                        } catch (InterruptedException i) {
114                                System.out.println("InterruptedException e: " + i);
115                                i.printStackTrace();
116                        } catch (ShutdownSignalException e) {
117                                System.out.println("ShutdownSignalException e: " + e);
118                                e.printStackTrace();
119                                try {
120                                        if (channel.isOpen()) {
121                                                channel.close();
122                                        }
123                                        startEventQueue();
124                                } catch (Exception e1) {
125                                        try {
126                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
127                                                Thread.sleep(milis);
128                                        } catch (InterruptedException e2) {
129                                                e2.printStackTrace();
130                                        }
131                                        e1.printStackTrace();
132                                }
133                        } catch (ConsumerCancelledException e) {
134                                System.out.println("ConsumerCancelledException e: " + e);
135                                e.printStackTrace();
136                        } catch (Exception e) {
137                                System.out.println("Exception e: " + e);
138                                e.printStackTrace();
139                        }
140                }
141        }
142
143        public int addListener(EventListener e) throws Exception {
144                // Map<String, ArrayList<EventListener<Event>>> mListeners =
145                // listeners.get(e.getTopic());
146                // if(mListeners == null){
147                // mListeners = new HashMap<String, ArrayList<EventListener<Event>>>();
148                //
149                // String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
150                // String reference = e.getTopic();
151                // channel.exchangeDeclare(reference, "fanout");
152                // channel.queueBind(queueName, reference, "");
153                // }
154
155                Vector<EventListener> vListeners = listeners.get(e.getTopic());
156                if (vListeners == null) {
157                        vListeners = new Vector<EventListener>();
158
159                        String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
160                        String reference = e.getTopic();
161
162                        System.out.println("EventDispatcher declaring fanout -> " + reference + " Binding with: " + queueName);
163
164                        channel.exchangeDeclare(reference, "fanout");
165                        channel.queueBind(queueName, reference, "");
166                }
167                vListeners.add(e);
168                listeners.put(e.getTopic(), vListeners);
169
170                return vListeners.size();
171        }
172
173        public int removeListener(EventListener e) {
174                Vector<EventListener> vListeners = listeners.get(e.getTopic());
175                if (vListeners != null) {
176                        // TODO: removeListener -> remove(e) override equals?
177                        vListeners.remove(e);
178                }
179
180                return vListeners.size();
181        }
182
183        /**
184         * This method dispatches the events. Every time an event is received, this
185         * method is launched. This method creates a new thread and executes the
186         * notifyEvent function of the listeners associated to this event
187         *
188         * @param topic
189         * @param event
190         */
191        public void dispatch(String topic, final Event event) {
192                if (listeners.containsKey(topic)) {
193                        for (final EventListener listener : listeners.get(topic)) {
194                                new Thread() {
195                                        @SuppressWarnings("unchecked")
196                                        public void run() {
197                                                listener.notifyEvent(event);
198                                        }
199                                }.start();
200                        }
201                }
202        }
203
204        public Map<String, Vector<EventListener>> getListeners() {
205                return listeners;
206        }
207
208        public void setListeners(Map<String, Vector<EventListener>> listeners) {
209                this.listeners = listeners;
210        }
211
212        public static boolean isVoid() {
213                return dispatcher == null;
214        }
215
216}
Note: See TracBrowser for help on using the repository browser.