- Timestamp:
- 10/01/13 12:02:41 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/src/main/java/omq/common/broker/RemoteBrokerImpl.java
r91 r92 2 2 3 3 import java.io.IOException; 4 import java.lang.reflect.Method;5 import java.util.ArrayList;6 import java.util.HashMap;7 import java.util.List;8 4 import java.util.Properties; 9 5 import java.util.Set; 10 6 11 import omq.common.util.ParameterQueue;12 7 import omq.exception.RemoteException; 13 8 import omq.server.RemoteObject; 14 import omq.server.RemoteWrapper;15 16 import com.rabbitmq.client.QueueingConsumer;17 9 18 10 /** … … 28 20 private static final long serialVersionUID = 1L; 29 21 30 // fanout broker 31 private String brokerSet; 32 // id broker 33 private String brokerName; 34 35 public void startRemoteBroker(String brokerSet, String brokerName, Broker broker, Properties env) throws Exception { 36 this.broker = broker; 37 this.UID = brokerName; 38 this.env = env; 39 this.brokerSet = brokerSet; 40 this.brokerName = brokerName; 41 42 this.params = new HashMap<String, List<Class<?>>>(); 43 for (Method m : this.getClass().getMethods()) { 44 List<Class<?>> list = new ArrayList<Class<?>>(); 45 for (Class<?> clazz : m.getParameterTypes()) { 46 list.add(clazz); 47 } 48 this.params.put(m.getName(), list); 49 } 50 51 // Get num threads to use 52 int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1")); 53 this.remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer()); 54 55 startQueues(); 56 57 // Start this listener 58 this.start(); 59 } 60 61 private void startQueues() throws Exception { 62 /* 63 * Unique queue 64 */ 65 channel.exchangeDeclare(brokerSet, "direct"); 66 channel.queueDeclare(brokerName, false, true, true, null); 67 channel.queueBind(brokerName, brokerSet, brokerName); 68 69 /* 70 * Multi queue 71 */ 72 channel.exchangeDeclare("multi#" + brokerSet, "fanout"); 73 channel.queueDeclare("multi#" + brokerName, false, true, true, null); 74 channel.queueBind("multi#" + brokerName, "multi#" + brokerSet, ""); 75 76 /* 77 * Consumer 78 */ 79 80 consumer = new QueueingConsumer(channel); 81 channel.basicConsume(brokerName, true, consumer); 82 channel.basicConsume(brokerName + "#multi", true, consumer); 22 @Override 23 public Set<String> getRemoteObjects() { 24 return getBroker().getRemoteObjs().keySet(); 83 25 } 84 26 85 27 @Override 86 public Set<String> getRemoteObjects() { 87 return this.broker.getRemoteObjs().keySet(); 28 public void spawnObject(String reference, String className, Properties env) throws Exception { 29 RemoteObject remote = (RemoteObject) Class.forName(className).newInstance(); 30 getBroker().bind(reference, remote, env); 88 31 } 89 32 90 33 @Override 91 public void spawnObject(String reference, String className , Class<?> parameterTypes, Object... args) throws Exception {92 RemoteObject remote = (RemoteObject) Class.forName(className). getConstructor(parameterTypes).newInstance(args);93 this.broker.bind(reference, remote);34 public void spawnObject(String reference, String className) throws Exception { 35 RemoteObject remote = (RemoteObject) Class.forName(className).newInstance(); 36 getBroker().bind(reference, remote); 94 37 } 95 38 96 39 @Override 97 40 public void deleteObject(String reference) throws RemoteException, IOException { 98 this.broker.unbind(reference);41 getBroker().unbind(reference); 99 42 } 100 43 101 44 @Override 102 public HasObject hasObject(String reference) { 103 boolean hasIt = this.broker.getRemoteObjs().containsKey(reference); 104 return new HasObject(this.brokerName, reference, hasIt); 45 public boolean hasObject(String reference) { 46 return getBroker().getRemoteObjs().containsKey(reference); 105 47 } 106 48
Note: See TracChangeset
for help on using the changeset viewer.