package omq.common.event; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Vector; import omq.common.broker.Broker; import omq.common.util.ParameterQueue; import omq.common.util.Serializer; import org.apache.log4j.Logger; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownSignalException; /** * This class dispatches the events received in the client side and stores them * into the different listeners that could exists among the different proxies * generated * * @author Sergi Toda * */ @SuppressWarnings("rawtypes") public class EventDispatcher extends Thread { private static final Logger logger = Logger.getLogger(EventDispatcher.class.getName()); private static EventDispatcher dispatcher; private Map> listeners; private Channel channel; private QueueingConsumer consumer; private Properties env; private boolean killed = false; private EventDispatcher(Properties env) throws Exception { this.env = env; // Declare the listeners map listeners = new HashMap>(); startEventQueue(); } private void startEventQueue() throws Exception { // Get a new connection and a new channel channel = Broker.getNewChannel(); String event_queue = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE); boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false")); channel.queueDeclare(event_queue, durable, false, false, null); // Declare a new consumer consumer = new QueueingConsumer(channel); channel.basicConsume(event_queue, true, consumer); } public static void init(Properties env) throws Exception { if (dispatcher == null) { dispatcher = new EventDispatcher(env); dispatcher.start(); } else { throw new Exception("Already initialized"); } } public static void stopEventDispatcher() throws Exception { logger.warn("Stopping EventDispatcher"); dispatcher.setListeners(null); dispatcher.killed = true; dispatcher.interrupt(); dispatcher.channel.close(); dispatcher = null; } public static EventDispatcher getDispatcher(Properties env) throws Exception { if (dispatcher == null) { dispatcher = new EventDispatcher(env); dispatcher.start(); } return dispatcher; } public static EventDispatcher getDispatcher() throws Exception { if (dispatcher == null) { throw new Exception("EventDispatcher not initialized"); } return dispatcher; } @Override public void run() { Delivery delivery; Event event; while (!killed) { try { // Get the delivery delivery = consumer.nextDelivery(); // Get the event event = Serializer.deserializeEvent(delivery.getBody()); logger.info("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId()); // Log.saveLog("Client-Deserialize", delivery.getBody()); // long timeEnd = (new Date()).getTime(); // Log.saveTimeSendRequestLog("Client-time-response", // event.getCorrId(), "Event!", timeEnd); // Dispatch it dispatch(event.getTopic(), event); } catch (InterruptedException i) { logger.error(i); } catch (ShutdownSignalException e) { logger.error(e); try { if (channel.isOpen()) { channel.close(); } startEventQueue(); } catch (Exception e1) { try { long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000")); Thread.sleep(milis); } catch (InterruptedException e2) { logger.error(e2); } logger.error(e1); } } catch (ConsumerCancelledException e) { logger.error(e); } catch (Exception e) { logger.error(e); } } } public int addListener(EventListener e) throws Exception { // Map>> mListeners = // listeners.get(e.getTopic()); // if(mListeners == null){ // mListeners = new HashMap>>(); // // String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE); // String reference = e.getTopic(); // channel.exchangeDeclare(reference, "fanout"); // channel.queueBind(queueName, reference, ""); // } Vector vListeners = listeners.get(e.getTopic()); if (vListeners == null) { vListeners = new Vector(); String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE); String reference = e.getTopic(); logger.info("Declaring fanout -> " + reference + " Binding with: " + queueName); channel.exchangeDeclare(reference, "fanout"); channel.queueBind(queueName, reference, ""); } vListeners.add(e); listeners.put(e.getTopic(), vListeners); return vListeners.size(); } public int removeListener(EventListener e) { Vector vListeners = listeners.get(e.getTopic()); if (vListeners != null) { // TODO: removeListener -> remove(e) override equals? vListeners.remove(e); } return vListeners.size(); } /** * This method dispatches the events. Every time an event is received, this * method is launched. This method creates a new thread and executes the * notifyEvent function of the listeners associated to this event * * @param topic * @param event */ public void dispatch(String topic, final Event event) { if (listeners.containsKey(topic)) { for (final EventListener listener : listeners.get(topic)) { new Thread() { @SuppressWarnings("unchecked") public void run() { listener.notifyEvent(event); } }.start(); } } } public Map> getListeners() { return listeners; } public void setListeners(Map> listeners) { this.listeners = listeners; } public static boolean isVoid() { return dispatcher == null; } }