Changeset 92 for branches/supervisor/src/main/java/omq/common/broker
- Timestamp:
- 10/01/13 12:02:41 (12 years ago)
- Location:
- branches/supervisor/src/main/java/omq/common/broker
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/src/main/java/omq/common/broker/Broker.java
r91 r92 20 20 import omq.exception.RemoteException; 21 21 import omq.server.RemoteObject; 22 import omq.supervisor.Supervisor; 22 23 23 24 import org.apache.log4j.Logger; … … 49 50 private boolean connectionClosed = false; 50 51 private Properties environment = null; 51 private RemoteBrokerImpl remoteBrokerImpl;52 52 private Map<String, RemoteObject> remoteObjs; 53 53 private Map<String, Object> proxies = new Hashtable<String, Object>(); 54 54 private Map<String, Object> multiProxies = new Hashtable<String, Object>(); 55 56 // Supervisor 57 private Supervisor supervisor; 55 58 56 59 public Broker(Properties env) throws Exception { … … 358 361 } 359 362 360 public void setSupervisor(String brokerSet, String brokerName) throws Exception {361 remoteBrokerImpl = new RemoteBrokerImpl();362 remoteBrokerImpl.startRemoteBroker(brokerSet, brokerName, this, getEnvironment());363 }364 365 363 public Properties getEnvironment() { 366 364 return environment; … … 379 377 } 380 378 379 /* 380 * Supervisor 381 */ 382 public void setSupervisor(String supervisorName, String brokerName) throws Exception { 383 // Create a RemoteBrokerImpl 384 bind(brokerName, new RemoteBrokerImpl()); 385 // Subscribe broker 386 supervisor = lookup(supervisorName, Supervisor.class); 387 supervisor.subscribe(brokerName); 388 logger.info("Supervisor set: " + supervisorName + ", BrokerName: " + brokerName); 389 } 390 391 public Supervisor getSupervisor() { 392 return supervisor; 393 } 381 394 } -
branches/supervisor/src/main/java/omq/common/broker/RemoteBroker.java
r91 r92 2 2 3 3 import java.io.IOException; 4 import java.util.Properties; 4 5 import java.util.Set; 5 6 6 7 import omq.Remote; 8 import omq.client.annotation.SyncMethod; 7 9 import omq.exception.RemoteException; 8 10 … … 10 12 public Set<String> getRemoteObjects(); 11 13 12 public void spawnObject(String reference, String className, Class<?> parameterTypes, Object... args) throws Exception; 14 public void spawnObject(String reference, String className, Properties env) throws Exception; 15 16 public void spawnObject(String reference, String className) throws Exception; 13 17 14 18 public void deleteObject(String reference) throws RemoteException, IOException; 15 19 16 public HasObject hasObject(String reference); 20 @SyncMethod(retry = 1, timeout = 1000) 21 public boolean hasObject(String reference); 17 22 18 23 } -
branches/supervisor/src/main/java/omq/common/broker/RemoteBrokerImpl.java
r91 r92 2 2 3 3 import java.io.IOException; 4 import java.lang.reflect.Method;5 import java.util.ArrayList;6 import java.util.HashMap;7 import java.util.List;8 4 import java.util.Properties; 9 5 import java.util.Set; 10 6 11 import omq.common.util.ParameterQueue;12 7 import omq.exception.RemoteException; 13 8 import omq.server.RemoteObject; 14 import omq.server.RemoteWrapper;15 16 import com.rabbitmq.client.QueueingConsumer;17 9 18 10 /** … … 28 20 private static final long serialVersionUID = 1L; 29 21 30 // fanout broker 31 private String brokerSet; 32 // id broker 33 private String brokerName; 34 35 public void startRemoteBroker(String brokerSet, String brokerName, Broker broker, Properties env) throws Exception { 36 this.broker = broker; 37 this.UID = brokerName; 38 this.env = env; 39 this.brokerSet = brokerSet; 40 this.brokerName = brokerName; 41 42 this.params = new HashMap<String, List<Class<?>>>(); 43 for (Method m : this.getClass().getMethods()) { 44 List<Class<?>> list = new ArrayList<Class<?>>(); 45 for (Class<?> clazz : m.getParameterTypes()) { 46 list.add(clazz); 47 } 48 this.params.put(m.getName(), list); 49 } 50 51 // Get num threads to use 52 int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1")); 53 this.remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer()); 54 55 startQueues(); 56 57 // Start this listener 58 this.start(); 59 } 60 61 private void startQueues() throws Exception { 62 /* 63 * Unique queue 64 */ 65 channel.exchangeDeclare(brokerSet, "direct"); 66 channel.queueDeclare(brokerName, false, true, true, null); 67 channel.queueBind(brokerName, brokerSet, brokerName); 68 69 /* 70 * Multi queue 71 */ 72 channel.exchangeDeclare("multi#" + brokerSet, "fanout"); 73 channel.queueDeclare("multi#" + brokerName, false, true, true, null); 74 channel.queueBind("multi#" + brokerName, "multi#" + brokerSet, ""); 75 76 /* 77 * Consumer 78 */ 79 80 consumer = new QueueingConsumer(channel); 81 channel.basicConsume(brokerName, true, consumer); 82 channel.basicConsume(brokerName + "#multi", true, consumer); 22 @Override 23 public Set<String> getRemoteObjects() { 24 return getBroker().getRemoteObjs().keySet(); 83 25 } 84 26 85 27 @Override 86 public Set<String> getRemoteObjects() { 87 return this.broker.getRemoteObjs().keySet(); 28 public void spawnObject(String reference, String className, Properties env) throws Exception { 29 RemoteObject remote = (RemoteObject) Class.forName(className).newInstance(); 30 getBroker().bind(reference, remote, env); 88 31 } 89 32 90 33 @Override 91 public void spawnObject(String reference, String className , Class<?> parameterTypes, Object... args) throws Exception {92 RemoteObject remote = (RemoteObject) Class.forName(className). getConstructor(parameterTypes).newInstance(args);93 this.broker.bind(reference, remote);34 public void spawnObject(String reference, String className) throws Exception { 35 RemoteObject remote = (RemoteObject) Class.forName(className).newInstance(); 36 getBroker().bind(reference, remote); 94 37 } 95 38 96 39 @Override 97 40 public void deleteObject(String reference) throws RemoteException, IOException { 98 this.broker.unbind(reference);41 getBroker().unbind(reference); 99 42 } 100 43 101 44 @Override 102 public HasObject hasObject(String reference) { 103 boolean hasIt = this.broker.getRemoteObjs().containsKey(reference); 104 return new HasObject(this.brokerName, reference, hasIt); 45 public boolean hasObject(String reference) { 46 return getBroker().getRemoteObjs().containsKey(reference); 105 47 } 106 48
Note: See TracChangeset
for help on using the changeset viewer.