package omq.common.event;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Vector;

import omq.common.broker.Broker;
import omq.common.util.ParameterQueue;
import omq.common.util.Serializer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;

/**
 * This class dispatches the events received in the client side and stores them
 * into the different listeners that could exists among the different proxies
 * generated
 * 
 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
 * 
 */
@SuppressWarnings("rawtypes")
public class EventDispatcher extends Thread {
	private static EventDispatcher dispatcher;

	private Map<String, Vector<EventListener>> listeners;
	private Channel channel;
	private QueueingConsumer consumer;
	private Properties env;
	private boolean killed = false;

	private EventDispatcher(Properties env) throws Exception {
		this.env = env;

		// Declare the listeners map
		listeners = new HashMap<String, Vector<EventListener>>();

		startEventQueue();

	}

	private void startEventQueue() throws Exception {
		// Get a new connection and a new channel
		channel = Broker.getNewChannel();

		String event_queue = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
		boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
		channel.queueDeclare(event_queue, durable, false, false, null);

		// Declare a new consumer
		consumer = new QueueingConsumer(channel);
		channel.basicConsume(event_queue, true, consumer);
	}

	public static void init(Properties env) throws Exception {
		if (dispatcher == null) {
			dispatcher = new EventDispatcher(env);
			dispatcher.start();
		} else {
			throw new Exception("Already initialized");
		}
	}

	public static void stopEventDispatcher() throws Exception {
		dispatcher.setListeners(null);
		dispatcher.killed = true;
		dispatcher.interrupt();
		dispatcher.channel.close();
		dispatcher = null;
	}

	public static EventDispatcher getDispatcher(Properties env) throws Exception {
		if (dispatcher == null) {
			dispatcher = new EventDispatcher(env);
			dispatcher.start();
		}
		return dispatcher;
	}

	public static EventDispatcher getDispatcher() throws Exception {
		if (dispatcher == null) {
			throw new Exception("EventDispatcher not initialized");
		}
		return dispatcher;
	}

	@Override
	public void run() {
		Delivery delivery;
		Event event;

		while (!killed) {
			try {
				// Get the delivery
				delivery = consumer.nextDelivery();

				// Get the event
				event = Serializer.deserializeEvent(delivery.getBody());

				System.out.println("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId());
				// Log.saveLog("Client-Deserialize", delivery.getBody());

				// long timeEnd = (new Date()).getTime();
				// Log.saveTimeSendRequestLog("Client-time-response",
				// event.getCorrId(), "Event!", timeEnd);

				// Dispatch it
				dispatch(event.getTopic(), event);
			} catch (InterruptedException i) {
				System.out.println("InterruptedException e: " + i);
				i.printStackTrace();
			} catch (ShutdownSignalException e) {
				System.out.println("ShutdownSignalException e: " + e);
				e.printStackTrace();
				try {
					if (channel.isOpen()) {
						channel.close();
					}
					startEventQueue();
				} catch (Exception e1) {
					try {
						long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
						Thread.sleep(milis);
					} catch (InterruptedException e2) {
						e2.printStackTrace();
					}
					e1.printStackTrace();
				}
			} catch (ConsumerCancelledException e) {
				System.out.println("ConsumerCancelledException e: " + e);
				e.printStackTrace();
			} catch (Exception e) {
				System.out.println("Exception e: " + e);
				e.printStackTrace();
			}
		}
	}

	public int addListener(EventListener e) throws Exception {
		// Map<String, ArrayList<EventListener<Event>>> mListeners =
		// listeners.get(e.getTopic());
		// if(mListeners == null){
		// mListeners = new HashMap<String, ArrayList<EventListener<Event>>>();
		//
		// String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
		// String reference = e.getTopic();
		// channel.exchangeDeclare(reference, "fanout");
		// channel.queueBind(queueName, reference, "");
		// }

		Vector<EventListener> vListeners = listeners.get(e.getTopic());
		if (vListeners == null) {
			vListeners = new Vector<EventListener>();

			String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
			String reference = e.getTopic();

			System.out.println("EventDispatcher declaring fanout -> " + reference + " Binding with: " + queueName);

			channel.exchangeDeclare(reference, "fanout");
			channel.queueBind(queueName, reference, "");
		}
		vListeners.add(e);
		listeners.put(e.getTopic(), vListeners);

		return vListeners.size();
	}

	public int removeListener(EventListener e) {
		Vector<EventListener> vListeners = listeners.get(e.getTopic());
		if (vListeners != null) {
			// TODO: removeListener -> remove(e) override equals?
			vListeners.remove(e);
		}

		return vListeners.size();
	}

	/**
	 * This method dispatches the events. Every time an event is received, this
	 * method is launched. This method creates a new thread and executes the
	 * notifyEvent function of the listeners associated to this event
	 * 
	 * @param topic
	 * @param event
	 */
	public void dispatch(String topic, final Event event) {
		if (listeners.containsKey(topic)) {
			for (final EventListener listener : listeners.get(topic)) {
				new Thread() {
					@SuppressWarnings("unchecked")
					public void run() {
						listener.notifyEvent(event);
					}
				}.start();
			}
		}
	}

	public Map<String, Vector<EventListener>> getListeners() {
		return listeners;
	}

	public void setListeners(Map<String, Vector<EventListener>> listeners) {
		this.listeners = listeners;
	}

	public static boolean isVoid() {
		return dispatcher == null;
	}

}
