package omq.common.broker;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import omq.Remote;
import omq.client.listener.ResponseListener;
import omq.client.proxy.Proxymq;
import omq.common.event.Event;
import omq.common.event.EventDispatcher;
import omq.common.event.EventWrapper;
import omq.common.util.Environment;
import omq.common.util.OmqConnectionFactory;
import omq.common.util.ParameterQueue;
import omq.common.util.Serializer;
import omq.exception.InitBrokerException;
import omq.exception.RemoteException;
import omq.exception.SerializerException;
import omq.server.RemoteObject;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;

public class Broker {
	private static Connection connection;
	private static Channel channel;
	private static boolean clientStarted = false;
	private static boolean connectionClosed = false;
	// TODO ask Pedro if it can be only one object in the map (an object can
	// have multiple threads in the same broker -see environment-)
	private static Map<String, RemoteObject> remoteObjs;

	/**
	 * Initializes a new Broker with the environment called by reference
	 * 
	 * @param env
	 * @throws Exception
	 */
	public static synchronized void initBroker(Properties env) throws Exception {
		if (Environment.isVoid()) {
			remoteObjs = new HashMap<String, RemoteObject>();
			Environment.setEnvironment(env);
			connection = OmqConnectionFactory.getNewConnection(env);
			channel = connection.createChannel();
			addFaultTolerance();
			try {
				tryConnection(env);
			} catch (Exception e) {
				channel.close();
				connection.close();
				throw new InitBrokerException("The connection didn't work");
			}
		} else {
			throw new InitBrokerException("Broker already started");
		}
	}

	public static void stopBroker() throws Exception {
		// Stop the client
		if (clientStarted) {
			ResponseListener.stopResponseListner();
			EventDispatcher.stopEventDispatcher();
		}
		// Stop all the remote objects working
		for (String reference : remoteObjs.keySet()) {
			unbind(reference);
		}
		// Close the connection once all the listeners are died
		closeConnection();
	}

	/**
	 * @return Broker's connection
	 * @throws Exception
	 */
	public static Connection getConnection() throws Exception {
		return connection;
	}

	public static void closeConnection() throws IOException {
		connectionClosed = true;
		connection.close();
	}

	/**
	 * 
	 * @return Broker's channel
	 * @throws Exception
	 */
	public static Channel getChannel() throws Exception {
		return channel;
	}

	/**
	 * Creates a new channel using the Broker's connection
	 * 
	 * @return newChannel
	 * @throws IOException
	 */
	public static Channel getNewChannel() throws IOException {
		return connection.createChannel();
	}

	@SuppressWarnings("unchecked")
	public static <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
		try {
			Properties environment = Environment.getEnvironment();

			if (!clientStarted) {
				initClient(environment);
				clientStarted = true;
			}

			if (!Proxymq.containsProxy(reference)) {
				Proxymq proxy = new Proxymq(reference, contract, environment);
				Class<?>[] array = { contract };
				return (T) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
			}
			return (T) Proxymq.getInstance(reference);

		} catch (Exception e) {
			throw new RemoteException(e);
		}
	}

	public static void bind(String reference, RemoteObject remote) throws RemoteException {
		try {
			Properties environment = Environment.getEnvironment();
			remote.startRemoteObject(reference, environment);
			remoteObjs.put(reference, remote);
		} catch (Exception e) {
			throw new RemoteException(e);
		}
	}

	public static void unbind(String reference) throws RemoteException, IOException {
		if (remoteObjs.containsKey(reference)) {
			RemoteObject remote = remoteObjs.get(reference);
			remote.kill();
		} else {
			throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
		}

	}

	public void rebind(String name, Remote obj) throws RemoteException {

	}

	/**
	 * This method ensures the client will have only one ResponseListener and
	 * only one EventDispatcher. Both with the same environment.
	 * 
	 * @param environment
	 * @throws Exception
	 */
	private static synchronized void initClient(Properties environment) throws Exception {
		if (ResponseListener.isVoid()) {
			ResponseListener.init(environment);
		}
		if (EventDispatcher.isVoid()) {
			EventDispatcher.init(environment);
		}
	}

	/**
	 * This method sends an event with its information
	 * 
	 * @param event
	 * @throws IOException
	 * @throws SerializerException
	 */
	public static void trigger(Event event) throws IOException, SerializerException {
		String UID = event.getTopic();
		EventWrapper wrapper = new EventWrapper(event);
		System.out.println("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());
		channel.exchangeDeclare(UID, "fanout");

		byte[] bytesResponse = Serializer.serialize(wrapper);
		channel.basicPublish(UID, "", null, bytesResponse);

		// Log.saveLog("Server-Serialize", bytesResponse);
	}

	/**
	 * This function is used to send a ping message to see if the connection
	 * works
	 * 
	 * @param env
	 * @throws Exception
	 */
	public static void tryConnection(Properties env) throws Exception {
		Channel channel = connection.createChannel();
		String message = "ping";

		String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
		String queueName = exchange;
		String routingKey = "routingKey";

		channel.exchangeDeclare(exchange, "direct");
		channel.queueDeclare(queueName, false, false, false, null);
		channel.queueBind(queueName, exchange, routingKey);

		channel.basicPublish(exchange, routingKey, null, message.getBytes());

		QueueingConsumer consumer = new QueueingConsumer(channel);

		channel.basicConsume(queueName, true, consumer);
		Delivery delivery = consumer.nextDelivery(1000);

		channel.exchangeDelete(exchange);
		channel.queueDelete(queueName);

		channel.close();

		if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
			throw new IOException("Ping initialitzation has failed");
		}
	}

	/**
	 * This method adds a ShutdownListener to the Broker's connection. When this
	 * connection falls, a new connection will be created and this will also
	 * have the listener.
	 */
	private static void addFaultTolerance() {
		connection.addShutdownListener(new ShutdownListener() {
			@Override
			public void shutdownCompleted(ShutdownSignalException cause) {
				if (!connectionClosed)
					if (cause.isHardError()) {
						if (connection.isOpen()) {
							try {
								connection.close();
							} catch (IOException e) {
								e.printStackTrace();
							}
						}
						try {
							Properties env = Environment.getEnvironment();
							connection = OmqConnectionFactory.getNewWorkingConnection(env);
							channel = connection.createChannel();
							addFaultTolerance();
						} catch (Exception e) {
							e.printStackTrace();
						}
					} else {
						Channel channel = (Channel) cause.getReference();
						if (channel.isOpen()) {
							try {
								channel.close();
							} catch (IOException e) {
								e.printStackTrace();
							}
						}
					}
			}
		});
	}

}
