source: branches/objectmq_old/src/omq/common/event/EventDispatcher.java

Last change on this file was 36, checked in by stoda, 11 years ago

Fault tolerance in the client side added.
Some refactoring in proxymq, broker, environment and the client listeners

File size: 6.0 KB
Line 
1package omq.common.event;
2
3import java.util.HashMap;
4import java.util.Map;
5import java.util.Properties;
6import java.util.Vector;
7
8import omq.common.broker.Broker;
9import omq.common.util.ParameterQueue;
10import omq.common.util.Serializer;
11
12import com.rabbitmq.client.Channel;
13import com.rabbitmq.client.ConsumerCancelledException;
14import com.rabbitmq.client.QueueingConsumer;
15import com.rabbitmq.client.QueueingConsumer.Delivery;
16import com.rabbitmq.client.ShutdownSignalException;
17
18/**
19 * This class dispatches the events received in the client side and stores them
20 * into the different listeners that could exists among the different proxies
21 * generated
22 *
23 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
24 *
25 */
26@SuppressWarnings("rawtypes")
27public class EventDispatcher extends Thread {
28        private static EventDispatcher dispatcher;
29
30        private Map<String, Vector<EventListener>> listeners;
31        private Channel channel;
32        private QueueingConsumer consumer;
33        private Properties env;
34        private boolean killed = false;
35
36        private EventDispatcher(Properties env) throws Exception {
37                this.env = env;
38
39                // Declare the listeners map
40                listeners = new HashMap<String, Vector<EventListener>>();
41
42                startEventQueue();
43
44        }
45
46        private void startEventQueue() throws Exception {
47                // Get a new connection and a new channel
48                channel = Broker.getNewChannel();
49
50                String event_queue = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
51                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
52                channel.queueDeclare(event_queue, durable, false, false, null);
53
54                // Declare a new consumer
55                consumer = new QueueingConsumer(channel);
56                channel.basicConsume(event_queue, true, consumer);
57        }
58
59        public static void init(Properties env) throws Exception {
60                if (dispatcher == null) {
61                        dispatcher = new EventDispatcher(env);
62                        dispatcher.start();
63                } else {
64                        throw new Exception("Already initialized");
65                }
66        }
67
68        public static void stopEventDispatcher() throws Exception {
69                dispatcher.setListeners(null);
70                dispatcher.killed = true;
71                dispatcher.interrupt();
72                dispatcher.channel.close();
73                dispatcher = null;
74        }
75
76        public static EventDispatcher getDispatcher(Properties env) throws Exception {
77                if (dispatcher == null) {
78                        dispatcher = new EventDispatcher(env);
79                        dispatcher.start();
80                }
81                return dispatcher;
82        }
83
84        public static EventDispatcher getDispatcher() throws Exception {
85                if (dispatcher == null) {
86                        throw new Exception("EventDispatcher not initialized");
87                }
88                return dispatcher;
89        }
90
91        @Override
92        public void run() {
93                Delivery delivery;
94                Event event;
95
96                while (!killed) {
97                        try {
98                                // Get the delivery
99                                delivery = consumer.nextDelivery();
100
101                                // Get the event
102                                event = Serializer.deserializeEvent(delivery.getBody());
103
104                                System.out.println("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId());
105                                // Log.saveLog("Client-Deserialize", delivery.getBody());
106
107                                // long timeEnd = (new Date()).getTime();
108                                // Log.saveTimeSendRequestLog("Client-time-response",
109                                // event.getCorrId(), "Event!", timeEnd);
110
111                                // Dispatch it
112                                dispatch(event.getTopic(), event);
113                        } catch (InterruptedException i) {
114                                System.out.println("InterruptedException e: " + i);
115                                i.printStackTrace();
116                        } catch (ShutdownSignalException e) {
117                                System.out.println("ShutdownSignalException e: " + e);
118                                e.printStackTrace();
119                                try {
120                                        if (channel.isOpen()) {
121                                                channel.close();
122                                        }
123                                        startEventQueue();
124                                } catch (Exception e1) {
125                                        try {
126                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
127                                                Thread.sleep(milis);
128                                        } catch (InterruptedException e2) {
129                                                e2.printStackTrace();
130                                        }
131                                        e1.printStackTrace();
132                                }
133                        } catch (ConsumerCancelledException e) {
134                                System.out.println("ConsumerCancelledException e: " + e);
135                                e.printStackTrace();
136                        } catch (Exception e) {
137                                System.out.println("Exception e: " + e);
138                                e.printStackTrace();
139                        }
140                }
141        }
142
143        public int addListener(EventListener e) throws Exception {
144                // Map<String, ArrayList<EventListener<Event>>> mListeners =
145                // listeners.get(e.getTopic());
146                // if(mListeners == null){
147                // mListeners = new HashMap<String, ArrayList<EventListener<Event>>>();
148                //
149                // String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
150                // String reference = e.getTopic();
151                // channel.exchangeDeclare(reference, "fanout");
152                // channel.queueBind(queueName, reference, "");
153                // }
154
155                Vector<EventListener> vListeners = listeners.get(e.getTopic());
156                if (vListeners == null) {
157                        vListeners = new Vector<EventListener>();
158
159                        String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
160                        String reference = e.getTopic();
161
162                        System.out.println("EventDispatcher declaring fanout -> " + reference + " Binding with: " + queueName);
163
164                        channel.exchangeDeclare(reference, "fanout");
165                        channel.queueBind(queueName, reference, "");
166                }
167                vListeners.add(e);
168                listeners.put(e.getTopic(), vListeners);
169
170                return vListeners.size();
171        }
172
173        public int removeListener(EventListener e) {
174                Vector<EventListener> vListeners = listeners.get(e.getTopic());
175                if (vListeners != null) {
176                        // TODO: removeListener -> remove(e) override equals?
177                        vListeners.remove(e);
178                }
179
180                return vListeners.size();
181        }
182
183        /**
184         * This method dispatches the events. Every time an event is received, this
185         * method is launched. This method creates a new thread and executes the
186         * notifyEvent function of the listeners associated to this event
187         *
188         * @param topic
189         * @param event
190         */
191        public void dispatch(String topic, final Event event) {
192                if (listeners.containsKey(topic)) {
193                        for (final EventListener listener : listeners.get(topic)) {
194                                new Thread() {
195                                        @SuppressWarnings("unchecked")
196                                        public void run() {
197                                                listener.notifyEvent(event);
198                                        }
199                                }.start();
200                        }
201                }
202        }
203
204        public Map<String, Vector<EventListener>> getListeners() {
205                return listeners;
206        }
207
208        public void setListeners(Map<String, Vector<EventListener>> listeners) {
209                this.listeners = listeners;
210        }
211
212        public static boolean isVoid() {
213                return dispatcher == null;
214        }
215
216}
Note: See TracBrowser for help on using the repository browser.