package omq.client.proxy; import java.io.IOException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.Collection; import java.util.HashMap; import java.util.Hashtable; import java.util.Map; import java.util.Properties; import org.apache.log4j.Logger; import omq.Remote; import omq.client.annotation.AsyncMethod; import omq.client.annotation.MultiMethod; import omq.client.annotation.SyncMethod; import omq.client.listener.ResponseListener; import omq.common.broker.Broker; import omq.common.event.Event; import omq.common.event.EventDispatcher; import omq.common.event.EventListener; import omq.common.message.Request; import omq.common.message.Response; import omq.common.util.ParameterQueue; import omq.common.util.Serializer; import omq.exception.NoContainsInstanceException; import omq.exception.OmqException; import omq.exception.RetryException; import omq.exception.SerializerException; import omq.exception.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; /** * EvoProxy class. This class inherits from InvocationHandler and gives you a * proxy with a server using an environment * * @author Sergi Toda * */ public class Proxymq implements InvocationHandler, Remote { /** * */ private static final long serialVersionUID = 1L; private static final Logger logger = Logger.getLogger(Proxymq.class.getName()); private static Map proxies = new Hashtable(); private String uid; private transient String serializerType; private transient Broker broker; private transient ResponseListener rListener; private transient EventDispatcher dispatcher; private transient Serializer serializer; // private transient Channel channel; private transient Properties env; private transient Map results; private transient Map> listeners; private static final Map> primitiveClasses = new HashMap>(); static { primitiveClasses.put("byte", Byte.class); primitiveClasses.put("short", Short.class); primitiveClasses.put("char", Character.class); primitiveClasses.put("int", Integer.class); primitiveClasses.put("long", Long.class); primitiveClasses.put("float", Float.class); primitiveClasses.put("double", Double.class); } /** * EvoProxy Constructor. * * This constructor uses an uid to know which object will call. It also uses * Properties to set where to send the messages * * @param uid * The uid represents the unique identifier of a remote object * @param clazz * It represents the real class of the remote object. With this * class the system can know the remoteInterface used and it can * also see which annotations are used * @param env * The environment is used to know where to send the messages * @throws Exception */ public Proxymq(String uid, Class clazz, Broker broker) throws Exception { this.uid = uid; this.broker = broker; rListener = broker.getResponseListener(); dispatcher = broker.getEventDispatcher(); serializer = broker.getSerializer(); // TODO what is better to use a new channel or to use the same? // this.channel = Broker.getChannel(); env = broker.getEnvironment(); // set the serializer type serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java); listeners = new HashMap>(); // Create a new hashmap and registry it in rListener results = new HashMap(); rListener.registerProxy(this); } @Override public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { // long timeStart = (new Date()).getTime(); // Local methods only String methodName = method.getName(); // The local methods will be invoked here if (method.getDeclaringClass().equals(Remote.class)) { if (methodName.equals("getRef")) { return getRef(); } else if (methodName.equals("addListener")) { addListener((EventListener) arguments[0]); return null; } else if (methodName.equals("removeListener")) { removeListener((EventListener) arguments[0]); return null; } else if (methodName.equals("getListeners")) { return getListeners(); } } // Create the request Request request = createRequest(method, arguments); Object response = null; // Publish the request if (request.isAsync()) { logger.debug("Publish async request -> " + request.getId()); publishAsyncRequest(request); } else { logger.debug("Publish sync request -> " + request.getId()); response = publishSyncRequest(request, method.getReturnType()); } return response; } private void publishMessage(Request request, String replyQueueName) throws Exception { String corrId = request.getId(); // Get the environment properties String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE); String routingkey = this.uid; // Add the correlation ID and create a replyTo property BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType).build(); // Publish the message byte[] bytesRequest = serializer.serialize(serializerType, request); // TODO See this // channel.basicPublish(exchange, routingkey, props, bytesRequest); broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest); } private void publishAsyncRequest(Request request) throws Exception { // Get the environment properties String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE); publishMessage(request, replyQueueName); } private Object publishSyncRequest(Request request, Class type) throws Exception { String corrId = request.getId(); int retries = request.getRetries(); long timeout = request.getTimeout(); // Get the environment properties String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE); // Publish the message int i = 0; while (i < retries) { try { publishMessage(request, replyQueueName); return getResult(corrId, timeout, type); } catch (TimeoutException te) { logger.error(te); } i++; } throw new RetryException(retries, timeout); } private Request createRequest(Method method, Object[] arguments) { String corrId = java.util.UUID.randomUUID().toString(); String methodName = method.getName(); boolean multi = false; if (method.getAnnotation(MultiMethod.class) != null) { multi = true; } // Since we need to know whether the method is async and if it has to // return using an annotation, we'll only check the AsyncMethod // annotation if (method.getAnnotation(AsyncMethod.class) == null) { int retries = 1; long timeout = ParameterQueue.DEFAULT_TIMEOUT; if (method.getAnnotation(SyncMethod.class) != null) { SyncMethod sync = method.getAnnotation(SyncMethod.class); retries = sync.retry(); timeout = sync.timeout(); } return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout); } else { return Request.newAsyncRequest(corrId, methodName, arguments, multi); } } private Object getResult(String corrId, long timeout, Class type) throws Exception { Response resp = null; // Wait for the results. long localTimeout = 0; long start = System.currentTimeMillis(); synchronized (results) { // Due to we are using notifyAll(), we need to control the real time while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) { results.wait(timeout); localTimeout = System.currentTimeMillis() - start; } if ((timeout - localTimeout) <= 0) { throw new TimeoutException("Timeout exception time: " + timeout); } resp = serializer.deserializeResponse(results.get(corrId), type); // Remove and indicate the key exists (a hashmap can contain a null // object, using this we'll know whether a response has been // received before) results.put(corrId, null); } if (resp.getError() != null) { OmqException error = resp.getError(); String name = error.getType(); String message = error.getMessage(); throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message); } return resp.getResult(); } /** * * @param reference * RemoteObject reference * @return true if the proxy has been created before or false in the other * case */ public static boolean containsProxy(String reference) { return proxies.containsKey(reference); } /** * * @param reference * RemoteObject reference * @return a proxy instance * @throws NoContainsInstanceException */ public static Object getInstance(String reference) throws NoContainsInstanceException { if (!containsProxy(reference)) { throw new NoContainsInstanceException(reference); } return proxies.get(reference); } /** * Returns an instance of a proxy class for the specified interfaces that * dispatches method invocations to the specified invocation handler. * @param * loader * * @param loader * the class loader to define the proxy class * * @param interfaces * the list of interfaces for the proxy class to implement * @param proxy * the invocation handler to dispatch method invocations to * @return a proxy instance with the specified invocation handler of a proxy * class that is defined by the specified class loader and that * implements the specified interfaces */ public static Object newProxyInstance(ClassLoader loader, Class[] interfaces, Proxymq proxy) { if (proxies.containsKey(proxy.getRef())) { return proxies.get(proxy.getRef()); } Object value = Proxy.newProxyInstance(loader, interfaces, proxy); proxies.put(proxy.getRef(), value); return value; } /** * Gets the Map used internally to retreive the response of the server * * @return a map with all the keys processed. Every key is a correlation id * of a method invoked remotely */ public Map getResults() { return results; } public static void stopProxy() { proxies = new HashMap(); } public static Map getProxies() { return proxies; } public static void setProxies(Map proxies) { Proxymq.proxies = proxies; } @Override public String getRef() { return uid; } @Override public void notifyEvent(Event event) throws IOException, SerializerException { } @Override public void addListener(EventListener eventListener) throws Exception { if (eventListener.getTopic() == null) { eventListener.setTopic(uid); } listeners.put(eventListener.getTopic(), eventListener); dispatcher.addListener(eventListener); } @Override public void removeListener(EventListener eventListener) throws Exception { listeners.remove(eventListener.getTopic()); dispatcher.removeListener(eventListener); } @Override public Collection> getListeners() throws Exception { return listeners.values(); } }