package omq.client.proxy;

import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Collection;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.Properties;

import omq.Remote;
import omq.client.annotation.AsyncMethod;
import omq.client.annotation.SyncMethod;
import omq.client.listener.ResponseListener;
import omq.common.broker.Broker;
import omq.common.event.Event;
import omq.common.event.EventDispatcher;
import omq.common.event.EventListener;
import omq.common.message.Request;
import omq.common.message.Response;
import omq.common.util.ParameterQueue;
import omq.common.util.Serializer;
import omq.exception.NoContainsInstanceException;
import omq.exception.OmqException;
import omq.exception.RetryException;
import omq.exception.SerializerException;
import omq.exception.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;

/**
 * EvoProxy class. This class inherits from InvocationHandler and gives you a
 * proxy with a server using an environment
 * 
 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
 * 
 */
public class Proxymq implements InvocationHandler, Remote {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	private static Map<String, Object> proxies = new Hashtable<String, Object>();

	private String uid;
	private transient ResponseListener rListener;
	private transient EventDispatcher dispatcher;
	// private transient Channel channel;
	private transient Properties env;
	private transient Map<String, byte[]> results;
	private transient Map<String, EventListener<?>> listeners;

	private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
	static {
		primitiveClasses.put("byte", Byte.class);
		primitiveClasses.put("short", Short.class);
		primitiveClasses.put("char", Character.class);
		primitiveClasses.put("int", Integer.class);
		primitiveClasses.put("long", Long.class);
		primitiveClasses.put("float", Float.class);
		primitiveClasses.put("double", Double.class);
	}

	/**
	 * EvoProxy Constructor.
	 * 
	 * This constructor uses an uid to know which object will call. It also uses
	 * Properties to set where to send the messages
	 * 
	 * @param uid
	 *            The uid represents the unique identifier of a remote object
	 * @param clazz
	 *            It represents the real class of the remote object. With this
	 *            class the system can know the remoteInterface used and it can
	 *            also see which annotations are used
	 * @param env
	 *            The environment is used to know where to send the messages
	 * @throws Exception
	 */
	public Proxymq(String uid, Class<?> clazz, Properties env) throws Exception {
		this.uid = uid;
		this.rListener = ResponseListener.getRequestListener();
		this.dispatcher = EventDispatcher.getDispatcher();

		// TODO what is better to use a new channel or to use the same?
		// this.channel = Broker.getChannel();
		this.env = env;

		listeners = new HashMap<String, EventListener<?>>();

		// Create a new hashmap and registry it in rListener
		results = new HashMap<String, byte[]>();
		rListener.registerProxy(this);
	}

	@Override
	public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
		// long timeStart = (new Date()).getTime();

		// Local methods only
		String methodName = method.getName();

		// The local methods will be invoked here
		if (method.getDeclaringClass().equals(Remote.class)) {
			if (methodName.equals("getRef")) {
				return getRef();
			} else if (methodName.equals("addListener")) {
				addListener((EventListener<?>) arguments[0]);
				return null;
			} else if (methodName.equals("removeListener")) {
				removeListener((EventListener<?>) arguments[0]);
				return null;
			} else if (methodName.equals("getListeners")) {
				return getListeners();
			}
		}

		// Create the request
		Request request = createRequest(method, arguments);

		// Log.saveTimeSendRequestLog("Client-time-request", request.getId(),
		// method.getName(), timeStart);

		Object response = null;
		// Publish the request
		if (request.isAsync()) {
			System.out.println("Publish async request -> " + request.getId());
			publishAsyncRequest(request);
		} else {
			System.out.println("Publish sync request -> " + request.getId());
			response = publishSyncRequest(request, method.getReturnType());

			// long timeEnd = (new Date()).getTime();
			// Log.saveTimeSendRequestLog("Client-time-response",
			// request.getId(), method.getName(), timeEnd);
		}

		return response;
	}

	private void publishMessage(Request request, String replyQueueName) throws Exception {
		String corrId = request.getId();

		// Get the environment properties
		String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
		String routingkey = this.uid;

		// Add the correlation ID and create a replyTo property
		BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).build();

		// Publish the message
		byte[] bytesRequest = Serializer.serialize(request);
		// TODO See this
		// channel.basicPublish(exchange, routingkey, props, bytesRequest);
		Broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
		// Log.saveLog("Client-Serialize", bytesRequest);
	}

	private void publishAsyncRequest(Request request) throws Exception {
		// Get the environment properties
		String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
		publishMessage(request, replyQueueName);
	}

	private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
		String corrId = request.getId();

		int retries = request.getRetries();
		long timeout = request.getTimeout();

		// Get the environment properties
		String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);

		// Publish the message
		int i = 0;
		while (i < retries) {
			try {
				publishMessage(request, replyQueueName);
				return getResult(corrId, timeout, type);
			} catch (TimeoutException te) {
				System.out.println("Timeout exception catched " + te);
				te.printStackTrace();
			}
			i++;
		}
		throw new RetryException(retries, timeout);
	}

	private Request createRequest(Method method, Object[] arguments) {
		String corrId = java.util.UUID.randomUUID().toString();
		String methodName = method.getName();

		// Since we need to know whether the method is async and if it has to
		// return using an annotation, we'll only check the AsyncMethod
		// annotation
		if (method.getAnnotation(AsyncMethod.class) == null) {
			int retries = 1;
			long timeout = ParameterQueue.DEFAULT_TIMEOUT;
			if (method.getAnnotation(SyncMethod.class) != null) {
				SyncMethod sync = method.getAnnotation(SyncMethod.class);
				retries = sync.retry();
				timeout = sync.timeout();
			}
			return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout);
		} else {
			return Request.newAsyncRequest(corrId, methodName, arguments);
		}
	}

	private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
		Response resp = null;

		// Wait for the results.
		long localTimeout = 0;
		long start = System.currentTimeMillis();
		synchronized (results) {
			// Due to we are using notifyAll(), we need to control the real time
			while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
				results.wait(timeout);
				localTimeout = System.currentTimeMillis() - start;
			}
			if ((timeout - localTimeout) <= 0) {
				throw new TimeoutException("Timeout exception time: " + timeout);
			}
			resp = Serializer.deserializeResponse(results.get(corrId), type);
			// Log.saveLog("Client-Deserialize", results.get(corrId));

			// Remove and indicate the key exists (a hashmap can contain a null
			// object, using this we'll know whether a response has been
			// received before)
			results.put(corrId, null);
		}

		if (resp.getError() != null) {
			OmqException error = resp.getError();
			String name = error.getType();
			String message = error.getMessage();
			throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
		}

		return resp.getResult();
	}

	/**
	 * 
	 * @param reference
	 *            RemoteObject reference
	 * @return true if the proxy has been created before or false in the other
	 *         case
	 */
	public static boolean containsProxy(String reference) {
		return proxies.containsKey(reference);
	}

	/**
	 * 
	 * @param reference
	 *            RemoteObject reference
	 * @return a proxy instance
	 * @throws NoContainsInstanceException
	 */
	public static Object getInstance(String reference) throws NoContainsInstanceException {
		if (!containsProxy(reference)) {
			throw new NoContainsInstanceException(reference);
		}
		return proxies.get(reference);
	}

	/**
	 * Returns an instance of a proxy class for the specified interfaces that
	 * dispatches method invocations to the specified invocation handler. * @param
	 * loader
	 * 
	 * @param loader
	 *            the class loader to define the proxy class
	 * 
	 * @param interfaces
	 *            the list of interfaces for the proxy class to implement
	 * @param proxy
	 *            the invocation handler to dispatch method invocations to
	 * @return a proxy instance with the specified invocation handler of a proxy
	 *         class that is defined by the specified class loader and that
	 *         implements the specified interfaces
	 */
	public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) {
		if (proxies.containsKey(proxy.getRef())) {
			System.out.println("Proxy trobat");
			return proxies.get(proxy.getRef());
		}
		Object value = Proxy.newProxyInstance(loader, interfaces, proxy);
		proxies.put(proxy.getRef(), value);
		return value;
	}

	/**
	 * Gets the Map used internally to retreive the response of the server
	 * 
	 * @return a map with all the keys processed. Every key is a correlation id
	 *         of a method invoked remotely
	 */
	public Map<String, byte[]> getResults() {
		return results;
	}

	@Override
	public String getRef() {
		return uid;
	}

	@Override
	public void notifyEvent(Event event) throws IOException, SerializerException {
	}

	@Override
	public void addListener(EventListener<?> eventListener) throws Exception {
		if (eventListener.getTopic() == null) {
			eventListener.setTopic(uid);
		}
		listeners.put(eventListener.getTopic(), eventListener);
		dispatcher.addListener(eventListener);
	}

	@Override
	public void removeListener(EventListener<?> eventListener) throws Exception {
		listeners.remove(eventListener.getTopic());
		dispatcher.removeListener(eventListener);
	}

	@Override
	public Collection<EventListener<?>> getListeners() throws Exception {
		return listeners.values();
	}

}
