package omq.common.broker; import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Properties; import java.util.Set; import omq.common.util.ParameterQueue; import omq.exception.RemoteException; import omq.server.RemoteObject; import omq.server.RemoteWrapper; import com.rabbitmq.client.QueueingConsumer; /** * * @author Sergi Toda * */ public class RemoteBrokerImpl extends RemoteObject implements RemoteBroker { /** * */ private static final long serialVersionUID = 1L; // fanout broker private String brokerSet; // id broker private String brokerName; public void startRemoteBroker(String brokerSet, String brokerName, Broker broker, Properties env) throws Exception { this.broker = broker; this.UID = brokerName; this.env = env; this.brokerSet = brokerSet; this.brokerName = brokerName; this.params = new HashMap>>(); for (Method m : this.getClass().getMethods()) { List> list = new ArrayList>(); for (Class clazz : m.getParameterTypes()) { list.add(clazz); } this.params.put(m.getName(), list); } // Get num threads to use int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1")); this.remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer()); startQueues(); // Start this listener this.start(); } private void startQueues() throws Exception { /* * Unique queue */ channel.exchangeDeclare(brokerSet, "direct"); channel.queueDeclare(brokerName, false, true, true, null); channel.queueBind(brokerName, brokerSet, brokerName); /* * Multi queue */ channel.exchangeDeclare("multi#" + brokerSet, "fanout"); channel.queueDeclare("multi#" + brokerName, false, true, true, null); channel.queueBind("multi#" + brokerName, "multi#" + brokerSet, ""); /* * Consumer */ consumer = new QueueingConsumer(channel); channel.basicConsume(brokerName, true, consumer); channel.basicConsume(brokerName + "#multi", true, consumer); } @Override public Set getRemoteObjects() { return this.broker.getRemoteObjs().keySet(); } @Override public void spawnObject(String reference, String className, Class parameterTypes, Object... args) throws Exception { RemoteObject remote = (RemoteObject) Class.forName(className).getConstructor(parameterTypes).newInstance(args); this.broker.bind(reference, remote); } @Override public void deleteObject(String reference) throws RemoteException, IOException { this.broker.unbind(reference); } @Override public HasObject hasObject(String reference) { boolean hasIt = this.broker.getRemoteObjs().containsKey(reference); return new HasObject(this.brokerName, reference, hasIt); } }