Changeset 103
- Timestamp:
- 10/15/13 13:21:33 (11 years ago)
- Location:
- branches/supervisor
- Files:
-
- 1 deleted
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/src/main/java/omq/common/broker/HasObject.java
r100 r103 37 37 } 38 38 39 public boolean isHasObject() {39 public boolean hasObject() { 40 40 return hasObject; 41 41 } -
branches/supervisor/src/main/java/omq/common/broker/RemoteBroker.java
r92 r103 8 8 import omq.client.annotation.SyncMethod; 9 9 import omq.exception.RemoteException; 10 import omq.exception.RetryException; 10 11 11 12 public interface RemoteBroker extends Remote { … … 19 20 20 21 @SyncMethod(retry = 1, timeout = 1000) 21 public boolean hasObject(String reference); 22 public boolean hasObject(String reference) throws RetryException; 23 24 @SyncMethod(retry = 1, timeout = 1000) 25 public HasObject hasObjectInfo(String reference) throws RetryException; 22 26 23 27 } -
branches/supervisor/src/main/java/omq/common/broker/RemoteBrokerImpl.java
r92 r103 6 6 7 7 import omq.exception.RemoteException; 8 import omq.exception.RetryException; 8 9 import omq.server.RemoteObject; 9 10 … … 43 44 44 45 @Override 45 public boolean hasObject(String reference) {46 public boolean hasObject(String reference) throws RetryException { 46 47 return getBroker().getRemoteObjs().containsKey(reference); 47 48 } 48 49 50 @Override 51 public HasObject hasObjectInfo(String reference) throws RetryException { 52 if (getBroker().getRemoteObjs().containsKey(reference)) { 53 RemoteObject r = getBroker().getRemoteObjs().get(reference); 54 int numThreads = r.getPool().getWorkers().size(); 55 return new HasObject(this.getRef(), reference, true, numThreads); 56 } 57 return new HasObject(this.getRef(), reference, false, 0); 58 } 59 49 60 } -
branches/supervisor/src/main/java/omq/supervisor/Supervisor.java
r93 r103 7 7 public void subscribe(String brokerName) throws Exception; 8 8 9 public void spawnObject(OmqSettings settings ) throws Exception;9 public void spawnObject(OmqSettings settings, int numObjects) throws Exception; 10 10 11 public void unbindObject(OmqSettings settings ) throws Exception;11 public void unbindObject(OmqSettings settings, int numObjects) throws Exception; 12 12 13 13 } -
branches/supervisor/src/main/java/omq/supervisor/SupervisorImpl.java
r94 r103 10 10 11 11 import org.apache.log4j.Logger; 12 13 import com.rabbitmq.client.AMQP.Queue.DeclareOk;14 import com.rabbitmq.client.Channel;15 12 16 13 public class SupervisorImpl extends RemoteObject implements Supervisor { … … 41 38 42 39 @Override 43 public void spawnObject(OmqSettings settings ) throws Exception {40 public void spawnObject(OmqSettings settings, int numObjects) throws Exception { 44 41 45 42 String reference = settings.getReference(); … … 50 47 51 48 int minObjects = settings.getMinNumberObjects(); 52 53 Channel channel = getBroker().getNewChannel();54 55 int numObjects = 0;56 try {57 DeclareOk dok = channel.queueDeclarePassive(reference);58 numObjects = dok.getConsumerCount();59 channel.close();60 } catch (Exception io) {61 // The queue doesn't exist & the channel has been closed62 }63 49 64 50 for (RemoteBroker broker : brokers) { … … 75 61 76 62 @Override 77 public void unbindObject(OmqSettings settings ) throws Exception {63 public void unbindObject(OmqSettings settings, int numObjects) throws Exception { 78 64 String reference = settings.getReference(); 79 65 80 66 int minObjects = settings.getMinNumberObjects(); 81 67 82 Channel channel = getBroker().getNewChannel(); 68 for (RemoteBroker broker : brokers) { 69 if (broker.hasObject(reference) && (numObjects - 1) >= minObjects) { 70 broker.deleteObject(reference); 71 break; 72 } 73 } 83 74 84 int numObjects = 0;85 try {86 DeclareOk dok = channel.queueDeclarePassive(reference);87 numObjects = dok.getConsumerCount();88 channel.close();89 90 for (RemoteBroker broker : brokers) {91 if (broker.hasObject(reference) && (numObjects - 1) >= minObjects) {92 broker.deleteObject(reference);93 break;94 }95 }96 97 } catch (Exception io) {98 // The queue doesn't exist & the channel has been closed99 }100 75 } 101 76 -
branches/supervisor/src/main/java/omq/supervisor/SupervisorThread.java
r100 r103 3 3 import java.util.Map; 4 4 import java.util.Set; 5 6 import omq.common.broker.RemoteBroker; 7 import omq.exception.RetryException; 5 8 6 9 import org.apache.log4j.Logger; … … 49 52 int minMessages = settings.getMinNumQueued(); 50 53 51 52 54 Channel channel = supervisor.getBroker().getChannel(); 53 55 DeclareOk dok = channel.queueDeclarePassive(reference); 54 56 55 int num Consumers = dok.getConsumerCount();57 int numObjects = getNumObjects(reference); 56 58 int numMessages = dok.getMessageCount(); 57 59 58 System.out.println("Num Consumers: " + num Consumers + ", num Messages: " + numMessages);60 System.out.println("Num Consumers: " + numObjects + ", num Messages: " + numMessages); 59 61 60 if (maxMessages < numMessages || num Consumers < minObjects) {62 if (maxMessages < numMessages || numObjects < minObjects) { 61 63 logger.info("SPAWN TIME!!"); 62 supervisor.spawnObject(settings );63 } else if (numMessages < minMessages && minObjects < num Consumers) {64 supervisor.spawnObject(settings, numObjects); 65 } else if (numMessages < minMessages && minObjects < numObjects) { 64 66 logger.info("Unbinding object!!!"); 65 supervisor.unbindObject(settings );67 supervisor.unbindObject(settings, numObjects); 66 68 } 67 69 } 70 71 private int getNumObjects(String reference) { 72 int num = 0; 73 for (RemoteBroker broker : supervisor.getBrokers()) { 74 try { 75 if (broker.hasObject(reference)) { 76 num++; 77 } 78 } catch (RetryException e) { 79 e.printStackTrace(); 80 } 81 } 82 return num; 83 } 68 84 } -
branches/supervisor/src/test/java/omq/test/supervisor/SleepTest.java
r102 r103 63 63 64 64 OmqSettings settings = new OmqSettings("sleep", SleepImpl.class.getName(), env, 1, 20, 20); 65 supervisor.spawnObject(settings );65 supervisor.spawnObject(settings, 0); 66 66 67 67 Sleep sleep = broker.lookup("sleep", Sleep.class);
Note: See TracChangeset
for help on using the changeset viewer.