/*
 * Decompiled with CFR 0.152.
 */
package omq.common.event;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Vector;
import omq.common.broker.Broker;
import omq.common.event.Event;
import omq.common.event.EventListener;
import omq.common.util.ParameterQueue;
import omq.common.util.Serializer;
import org.apache.log4j.Logger;

public class EventDispatcher
extends Thread {
    private static final Logger logger = Logger.getLogger((String)EventDispatcher.class.getName());
    private static EventDispatcher dispatcher;
    private Map<String, Vector<EventListener>> listeners;
    private Channel channel;
    private QueueingConsumer consumer;
    private Properties env;
    private boolean killed = false;

    private EventDispatcher(Properties env) throws Exception {
        this.env = env;
        this.listeners = new HashMap<String, Vector<EventListener>>();
        this.startEventQueue();
    }

    private void startEventQueue() throws Exception {
        this.channel = Broker.getNewChannel();
        String event_queue = this.env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
        boolean durable = Boolean.parseBoolean(this.env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
        this.channel.queueDeclare(event_queue, durable, false, false, null);
        this.consumer = new QueueingConsumer(this.channel);
        this.channel.basicConsume(event_queue, true, (Consumer)this.consumer);
    }

    public static void init(Properties env) throws Exception {
        if (dispatcher != null) {
            throw new Exception("Already initialized");
        }
        dispatcher = new EventDispatcher(env);
        dispatcher.start();
    }

    public static void stopEventDispatcher() throws Exception {
        logger.warn((Object)"Stopping EventDispatcher");
        dispatcher.setListeners(null);
        EventDispatcher.dispatcher.killed = true;
        dispatcher.interrupt();
        EventDispatcher.dispatcher.channel.close();
        dispatcher = null;
    }

    public static EventDispatcher getDispatcher(Properties env) throws Exception {
        if (dispatcher == null) {
            dispatcher = new EventDispatcher(env);
            dispatcher.start();
        }
        return dispatcher;
    }

    public static EventDispatcher getDispatcher() throws Exception {
        if (dispatcher == null) {
            throw new Exception("EventDispatcher not initialized");
        }
        return dispatcher;
    }

    @Override
    public void run() {
        while (!this.killed) {
            try {
                QueueingConsumer.Delivery delivery = this.consumer.nextDelivery();
                Event event = Serializer.deserializeEvent(delivery.getBody());
                logger.info((Object)("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId()));
                this.dispatch(event.getTopic(), event);
            }
            catch (InterruptedException i) {
                logger.error((Object)i);
            }
            catch (ShutdownSignalException e) {
                logger.error((Object)e);
                try {
                    if (this.channel.isOpen()) {
                        this.channel.close();
                    }
                    this.startEventQueue();
                }
                catch (Exception e1) {
                    try {
                        long milis = Long.parseLong(this.env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
                        Thread.sleep(milis);
                    }
                    catch (InterruptedException e2) {
                        logger.error((Object)e2);
                    }
                    logger.error((Object)e1);
                }
            }
            catch (ConsumerCancelledException e) {
                logger.error((Object)e);
            }
            catch (Exception e) {
                logger.error((Object)e);
            }
        }
    }

    public int addListener(EventListener e) throws Exception {
        Vector vListeners = this.listeners.get(e.getTopic());
        if (vListeners == null) {
            vListeners = new Vector();
            String queueName = this.env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
            String reference = e.getTopic();
            logger.info((Object)("Declaring fanout -> " + reference + " Binding with: " + queueName));
            this.channel.exchangeDeclare(reference, "fanout");
            this.channel.queueBind(queueName, reference, "");
        }
        vListeners.add(e);
        this.listeners.put(e.getTopic(), vListeners);
        return vListeners.size();
    }

    public int removeListener(EventListener e) {
        Vector<EventListener> vListeners = this.listeners.get(e.getTopic());
        if (vListeners != null) {
            vListeners.remove(e);
        }
        return vListeners.size();
    }

    public void dispatch(String topic, final Event event) {
        if (this.listeners.containsKey(topic)) {
            for (final EventListener listener : this.listeners.get(topic)) {
                new Thread(){

                    @Override
                    public void run() {
                        listener.notifyEvent(event);
                    }
                }.start();
            }
        }
    }

    public Map<String, Vector<EventListener>> getListeners() {
        return this.listeners;
    }

    public void setListeners(Map<String, Vector<EventListener>> listeners) {
        this.listeners = listeners;
    }

    public static boolean isVoid() {
        return dispatcher == null;
    }
}

