package omq.server.remote.request; import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import omq.Remote; import omq.common.broker.Broker; import omq.common.event.Event; import omq.common.event.EventListener; import omq.common.event.EventWrapper; import omq.common.util.ParameterQueue; import omq.common.util.Serializer; import omq.exception.SerializerException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownSignalException; /** * * @author Sergi Toda * */ public abstract class RemoteObject extends Thread implements Remote { private static final long serialVersionUID = -1778953938739846450L; private String UID; private Properties env; private transient RemoteWrapper remoteWrapper; private transient Map>> params; private transient Channel channel; private transient QueueingConsumer consumer; private transient boolean killed = false; private static final Map> primitiveClasses = new HashMap>(); static { primitiveClasses.put("byte", Byte.class); primitiveClasses.put("short", Short.class); primitiveClasses.put("char", Character.class); primitiveClasses.put("int", Integer.class); primitiveClasses.put("long", Long.class); primitiveClasses.put("float", Float.class); primitiveClasses.put("double", Double.class); } public RemoteObject() { } public void startRemoteObject(String reference, Properties env) throws Exception { this.UID = reference; this.env = env; params = new HashMap>>(); for (Method m : this.getClass().getMethods()) { List> list = new ArrayList>(); for (Class clazz : m.getParameterTypes()) { list.add(clazz); } params.put(m.getName(), list); } // Get num threads to use int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1")); remoteWrapper = new RemoteWrapper(this, numThreads); startQueues(); // Start this listener this.start(); } @Override public void run() { while (!killed) { try { Delivery delivery = consumer.nextDelivery(); System.out.println("RemoteObject: " + UID + " has received a message"); remoteWrapper.notifyDelivery(delivery); } catch (InterruptedException i) { i.printStackTrace(); } catch (ShutdownSignalException e) { e.printStackTrace(); try { if (channel.isOpen()) { channel.close(); } startQueues(); } catch (Exception e1) { try { long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000")); Thread.sleep(milis); } catch (InterruptedException e2) { // TODO Auto-generated catch block e2.printStackTrace(); } e1.printStackTrace(); } } catch (ConsumerCancelledException e) { e.printStackTrace(); } catch (SerializerException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } } @Override public String getRef() { return UID; } @Override public void notifyEvent(Event event) throws IOException, SerializerException { event.setTopic(UID); EventWrapper wrapper = new EventWrapper(event); channel.exchangeDeclare(UID, "fanout"); channel.basicPublish(UID, "", null, Serializer.serialize(wrapper)); } public void kill() throws IOException { interrupt(); killed = true; channel.close(); remoteWrapper.stopRemoteWrapper(); } public Object invokeMethod(String methodName, Object[] arguments) throws Exception { // Get the specific method identified by methodName and its arguments Method method = loadMethod(methodName, arguments); return method.invoke(this, arguments); } private Method loadMethod(String methodName, Object[] args) throws NoSuchMethodException { Method m = null; // Obtain the class reference Class clazz = this.getClass(); Class[] argArray = null; if (args != null) { argArray = new Class[args.length]; for (int i = 0; i < args.length; i++) { argArray[i] = args[i].getClass(); } } try { m = clazz.getMethod(methodName, argArray); } catch (NoSuchMethodException nsm) { m = loadMethodWithPrimitives(methodName, argArray); } return m; } private Method loadMethodWithPrimitives(String methodName, Class[] argArray) throws NoSuchMethodException { Method[] methods = this.getClass().getMethods(); int length = argArray.length; for (Method method : methods) { String name = method.getName(); int argsLength = method.getParameterTypes().length; if (name.equals(methodName) && length == argsLength) { // This array can have primitive types inside Class[] params = method.getParameterTypes(); boolean found = true; for (int i = 0; i < length; i++) { if (params[i].isPrimitive()) { Class paramWrapper = primitiveClasses.get(params[i].getName()); if (!paramWrapper.equals(argArray[i])) { found = false; break; } } } if (found) { return method; } } } throw new NoSuchMethodException(methodName); } public List> getParams(String methodName) { return params.get(methodName); } public Channel getChannel() { return channel; } private void startQueues() throws Exception { // Get info about which exchange and queue will use String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE); String queue = UID; String routingKey = UID; // Start channel channel = Broker.getNewChannel(); // Declares and bindings System.out.println("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue); channel.exchangeDeclare(exchange, "direct"); channel.queueDeclare(queue, false, false, false, null); channel.queueBind(queue, exchange, routingKey); // Declare the event topic fanout System.out.println("RemoteObject: " + UID + " declaring fanout exchange: " + UID); channel.exchangeDeclare(UID, "fanout"); // Declare a new consumer consumer = new QueueingConsumer(channel); channel.basicConsume(queue, true, consumer); } @Override public void addListener(EventListener eventListener) throws Exception { } @Override public void removeListener(EventListener eventListener) throws Exception { } @Override public Collection getListeners() throws Exception { return null; } }