Changeset 92
- Timestamp:
- 10/01/13 12:02:41 (11 years ago)
- Location:
- branches/supervisor
- Files:
-
- 7 added
- 2 deleted
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/src/main/java/omq/client/proxy/Proxymq.java
r84 r92 116 116 if (methodName.equals("getRef")) { 117 117 return getRef(); 118 } 119 if (methodName.equals("equals")) { 120 if (arguments[0] instanceof Remote) { 121 return getRef().equals(((Remote) arguments[0]).getRef()); 122 } else { 123 return false; 124 } 118 125 } 119 126 } -
branches/supervisor/src/main/java/omq/common/broker/Broker.java
r91 r92 20 20 import omq.exception.RemoteException; 21 21 import omq.server.RemoteObject; 22 import omq.supervisor.Supervisor; 22 23 23 24 import org.apache.log4j.Logger; … … 49 50 private boolean connectionClosed = false; 50 51 private Properties environment = null; 51 private RemoteBrokerImpl remoteBrokerImpl;52 52 private Map<String, RemoteObject> remoteObjs; 53 53 private Map<String, Object> proxies = new Hashtable<String, Object>(); 54 54 private Map<String, Object> multiProxies = new Hashtable<String, Object>(); 55 56 // Supervisor 57 private Supervisor supervisor; 55 58 56 59 public Broker(Properties env) throws Exception { … … 358 361 } 359 362 360 public void setSupervisor(String brokerSet, String brokerName) throws Exception {361 remoteBrokerImpl = new RemoteBrokerImpl();362 remoteBrokerImpl.startRemoteBroker(brokerSet, brokerName, this, getEnvironment());363 }364 365 363 public Properties getEnvironment() { 366 364 return environment; … … 379 377 } 380 378 379 /* 380 * Supervisor 381 */ 382 public void setSupervisor(String supervisorName, String brokerName) throws Exception { 383 // Create a RemoteBrokerImpl 384 bind(brokerName, new RemoteBrokerImpl()); 385 // Subscribe broker 386 supervisor = lookup(supervisorName, Supervisor.class); 387 supervisor.subscribe(brokerName); 388 logger.info("Supervisor set: " + supervisorName + ", BrokerName: " + brokerName); 389 } 390 391 public Supervisor getSupervisor() { 392 return supervisor; 393 } 381 394 } -
branches/supervisor/src/main/java/omq/common/broker/RemoteBroker.java
r91 r92 2 2 3 3 import java.io.IOException; 4 import java.util.Properties; 4 5 import java.util.Set; 5 6 6 7 import omq.Remote; 8 import omq.client.annotation.SyncMethod; 7 9 import omq.exception.RemoteException; 8 10 … … 10 12 public Set<String> getRemoteObjects(); 11 13 12 public void spawnObject(String reference, String className, Class<?> parameterTypes, Object... args) throws Exception; 14 public void spawnObject(String reference, String className, Properties env) throws Exception; 15 16 public void spawnObject(String reference, String className) throws Exception; 13 17 14 18 public void deleteObject(String reference) throws RemoteException, IOException; 15 19 16 public HasObject hasObject(String reference); 20 @SyncMethod(retry = 1, timeout = 1000) 21 public boolean hasObject(String reference); 17 22 18 23 } -
branches/supervisor/src/main/java/omq/common/broker/RemoteBrokerImpl.java
r91 r92 2 2 3 3 import java.io.IOException; 4 import java.lang.reflect.Method;5 import java.util.ArrayList;6 import java.util.HashMap;7 import java.util.List;8 4 import java.util.Properties; 9 5 import java.util.Set; 10 6 11 import omq.common.util.ParameterQueue;12 7 import omq.exception.RemoteException; 13 8 import omq.server.RemoteObject; 14 import omq.server.RemoteWrapper;15 16 import com.rabbitmq.client.QueueingConsumer;17 9 18 10 /** … … 28 20 private static final long serialVersionUID = 1L; 29 21 30 // fanout broker 31 private String brokerSet; 32 // id broker 33 private String brokerName; 34 35 public void startRemoteBroker(String brokerSet, String brokerName, Broker broker, Properties env) throws Exception { 36 this.broker = broker; 37 this.UID = brokerName; 38 this.env = env; 39 this.brokerSet = brokerSet; 40 this.brokerName = brokerName; 41 42 this.params = new HashMap<String, List<Class<?>>>(); 43 for (Method m : this.getClass().getMethods()) { 44 List<Class<?>> list = new ArrayList<Class<?>>(); 45 for (Class<?> clazz : m.getParameterTypes()) { 46 list.add(clazz); 47 } 48 this.params.put(m.getName(), list); 49 } 50 51 // Get num threads to use 52 int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1")); 53 this.remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer()); 54 55 startQueues(); 56 57 // Start this listener 58 this.start(); 59 } 60 61 private void startQueues() throws Exception { 62 /* 63 * Unique queue 64 */ 65 channel.exchangeDeclare(brokerSet, "direct"); 66 channel.queueDeclare(brokerName, false, true, true, null); 67 channel.queueBind(brokerName, brokerSet, brokerName); 68 69 /* 70 * Multi queue 71 */ 72 channel.exchangeDeclare("multi#" + brokerSet, "fanout"); 73 channel.queueDeclare("multi#" + brokerName, false, true, true, null); 74 channel.queueBind("multi#" + brokerName, "multi#" + brokerSet, ""); 75 76 /* 77 * Consumer 78 */ 79 80 consumer = new QueueingConsumer(channel); 81 channel.basicConsume(brokerName, true, consumer); 82 channel.basicConsume(brokerName + "#multi", true, consumer); 22 @Override 23 public Set<String> getRemoteObjects() { 24 return getBroker().getRemoteObjs().keySet(); 83 25 } 84 26 85 27 @Override 86 public Set<String> getRemoteObjects() { 87 return this.broker.getRemoteObjs().keySet(); 28 public void spawnObject(String reference, String className, Properties env) throws Exception { 29 RemoteObject remote = (RemoteObject) Class.forName(className).newInstance(); 30 getBroker().bind(reference, remote, env); 88 31 } 89 32 90 33 @Override 91 public void spawnObject(String reference, String className , Class<?> parameterTypes, Object... args) throws Exception {92 RemoteObject remote = (RemoteObject) Class.forName(className). getConstructor(parameterTypes).newInstance(args);93 this.broker.bind(reference, remote);34 public void spawnObject(String reference, String className) throws Exception { 35 RemoteObject remote = (RemoteObject) Class.forName(className).newInstance(); 36 getBroker().bind(reference, remote); 94 37 } 95 38 96 39 @Override 97 40 public void deleteObject(String reference) throws RemoteException, IOException { 98 this.broker.unbind(reference);41 getBroker().unbind(reference); 99 42 } 100 43 101 44 @Override 102 public HasObject hasObject(String reference) { 103 boolean hasIt = this.broker.getRemoteObjs().containsKey(reference); 104 return new HasObject(this.brokerName, reference, hasIt); 45 public boolean hasObject(String reference) { 46 return getBroker().getRemoteObjs().containsKey(reference); 105 47 } 106 48 -
branches/supervisor/src/main/java/omq/server/RemoteObject.java
r91 r92 38 38 private static final Logger logger = Logger.getLogger(RemoteObject.class.getName()); 39 39 40 pr otectedString UID;41 pr otectedProperties env;42 pr otectedtransient Broker broker;43 pr otectedtransient String multiQueue;44 pr otectedtransient RemoteWrapper remoteWrapper;45 pr otectedtransient Map<String, List<Class<?>>> params;46 pr otectedtransient Channel channel;47 pr otectedtransient QueueingConsumer consumer;48 pr otectedtransient boolean killed = false;40 private String UID; 41 private Properties env; 42 private transient Broker broker; 43 private transient String multiQueue; 44 private transient RemoteWrapper remoteWrapper; 45 private transient Map<String, List<Class<?>>> params; 46 private transient Channel channel; 47 private transient QueueingConsumer consumer; 48 private transient boolean killed = false; 49 49 50 50 private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>(); … … 254 254 } 255 255 256 public Broker getBroker() { 257 return broker; 258 } 259 256 260 /** 257 261 * This method starts the queues using the information got in the … … 319 323 boolean autoAck = false; 320 324 325 //TODO see if this is useless 326 int prefetchCount = 1; 327 channel.basicQos(prefetchCount); 328 321 329 // Declare a new consumer 322 330 consumer = new QueueingConsumer(channel); -
branches/supervisor/src/main/java/omq/supervisor/OmqSettings.java
r91 r92 1 1 package omq.supervisor; 2 2 3 import java.io.Serializable; 3 4 import java.util.Properties; 4 5 5 public class OmqSettings { 6 public class OmqSettings implements Serializable { 7 8 6 9 10 /** 11 * 12 */ 13 private static final long serialVersionUID = 1L; 7 14 private String reference; 8 15 private String className; -
branches/supervisor/src/main/java/omq/supervisor/Supervisor.java
r91 r92 1 1 package omq.supervisor; 2 2 3 import java.util.Map; 4 import java.util.Set; 3 import omq.Remote; 5 4 6 import omq.common.broker.RemoteBroker; 5 public interface Supervisor extends Remote { 7 6 8 public class Supervisor { 7 public void subscribe(String brokerName) throws Exception; 9 8 10 private Set<String> bindReferences; 11 private Map<String, RemoteBroker> brokers; 9 public void spawnObject(OmqSettings settings) throws Exception; 12 10 13 private void checkObject() {14 String reference = null;15 16 int numObjects = 0;17 18 if(minObjects > numObjects || maxMessages < numEncuats){19 spawn:20 pregunta a tots i qui no té l'objecte li poses21 }else if(numEncuats < minMessages && minObjects > numObjects){22 delete:23 pregunta a tots i qui té l'objecte li treus24 }25 }26 11 } -
branches/supervisor/src/test/java/omq/test/lock/SleepTest.java
r91 r92 20 20 env.setProperty(ParameterQueue.RABBIT_HOST, "127.0.0.1"); 21 21 env.setProperty(ParameterQueue.RABBIT_PORT, "5672"); 22 env.setProperty(ParameterQueue.NUM_THREADS, " 2");22 env.setProperty(ParameterQueue.NUM_THREADS, "1"); 23 23 24 SleepImpl sleep = new SleepImpl(); 24 for (int i = 0; i < 4; i++) { 25 Broker broker = new Broker(env); 26 broker.bind("sleep", new SleepImpl()); 27 } 25 28 26 Broker broker = new Broker(env);27 broker.bind("sleep", sleep);28 29 } 29 30
Note: See TracChangeset
for help on using the changeset viewer.