Ignore:
Timestamp:
10/15/13 13:21:33 (11 years ago)
Author:
stoda
Message:

Supervisor done

TODO: revise supervisor

Location:
branches/supervisor/src
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/common/broker/HasObject.java

    r100 r103  
    3737        }
    3838
    39         public boolean isHasObject() {
     39        public boolean hasObject() {
    4040                return hasObject;
    4141        }
  • branches/supervisor/src/main/java/omq/common/broker/RemoteBroker.java

    r92 r103  
    88import omq.client.annotation.SyncMethod;
    99import omq.exception.RemoteException;
     10import omq.exception.RetryException;
    1011
    1112public interface RemoteBroker extends Remote {
     
    1920
    2021        @SyncMethod(retry = 1, timeout = 1000)
    21         public boolean hasObject(String reference);
     22        public boolean hasObject(String reference) throws RetryException;
     23
     24        @SyncMethod(retry = 1, timeout = 1000)
     25        public HasObject hasObjectInfo(String reference) throws RetryException;
    2226
    2327}
  • branches/supervisor/src/main/java/omq/common/broker/RemoteBrokerImpl.java

    r92 r103  
    66
    77import omq.exception.RemoteException;
     8import omq.exception.RetryException;
    89import omq.server.RemoteObject;
    910
     
    4344
    4445        @Override
    45         public boolean hasObject(String reference) {
     46        public boolean hasObject(String reference) throws RetryException {
    4647                return getBroker().getRemoteObjs().containsKey(reference);
    4748        }
    4849
     50        @Override
     51        public HasObject hasObjectInfo(String reference) throws RetryException {
     52                if (getBroker().getRemoteObjs().containsKey(reference)) {
     53                        RemoteObject r = getBroker().getRemoteObjs().get(reference);
     54                        int numThreads = r.getPool().getWorkers().size();
     55                        return new HasObject(this.getRef(), reference, true, numThreads);
     56                }
     57                return new HasObject(this.getRef(), reference, false, 0);
     58        }
     59
    4960}
  • branches/supervisor/src/main/java/omq/supervisor/Supervisor.java

    r93 r103  
    77        public void subscribe(String brokerName) throws Exception;
    88
    9         public void spawnObject(OmqSettings settings) throws Exception;
     9        public void spawnObject(OmqSettings settings, int numObjects) throws Exception;
    1010
    11         public void unbindObject(OmqSettings settings) throws Exception;
     11        public void unbindObject(OmqSettings settings, int numObjects) throws Exception;
    1212
    1313}
  • branches/supervisor/src/main/java/omq/supervisor/SupervisorImpl.java

    r94 r103  
    1010
    1111import org.apache.log4j.Logger;
    12 
    13 import com.rabbitmq.client.AMQP.Queue.DeclareOk;
    14 import com.rabbitmq.client.Channel;
    1512
    1613public class SupervisorImpl extends RemoteObject implements Supervisor {
     
    4138
    4239        @Override
    43         public void spawnObject(OmqSettings settings) throws Exception {
     40        public void spawnObject(OmqSettings settings, int numObjects) throws Exception {
    4441
    4542                String reference = settings.getReference();
     
    5047
    5148                int minObjects = settings.getMinNumberObjects();
    52 
    53                 Channel channel = getBroker().getNewChannel();
    54 
    55                 int numObjects = 0;
    56                 try {
    57                         DeclareOk dok = channel.queueDeclarePassive(reference);
    58                         numObjects = dok.getConsumerCount();
    59                         channel.close();
    60                 } catch (Exception io) {
    61                         // The queue doesn't exist & the channel has been closed
    62                 }
    6349
    6450                for (RemoteBroker broker : brokers) {
     
    7561
    7662        @Override
    77         public void unbindObject(OmqSettings settings) throws Exception {
     63        public void unbindObject(OmqSettings settings, int numObjects) throws Exception {
    7864                String reference = settings.getReference();
    7965
    8066                int minObjects = settings.getMinNumberObjects();
    8167
    82                 Channel channel = getBroker().getNewChannel();
     68                for (RemoteBroker broker : brokers) {
     69                        if (broker.hasObject(reference) && (numObjects - 1) >= minObjects) {
     70                                broker.deleteObject(reference);
     71                                break;
     72                        }
     73                }
    8374
    84                 int numObjects = 0;
    85                 try {
    86                         DeclareOk dok = channel.queueDeclarePassive(reference);
    87                         numObjects = dok.getConsumerCount();
    88                         channel.close();
    89 
    90                         for (RemoteBroker broker : brokers) {
    91                                 if (broker.hasObject(reference) && (numObjects - 1) >= minObjects) {
    92                                         broker.deleteObject(reference);
    93                                         break;
    94                                 }
    95                         }
    96 
    97                 } catch (Exception io) {
    98                         // The queue doesn't exist & the channel has been closed
    99                 }
    10075        }
    10176
  • branches/supervisor/src/main/java/omq/supervisor/SupervisorThread.java

    r100 r103  
    33import java.util.Map;
    44import java.util.Set;
     5
     6import omq.common.broker.RemoteBroker;
     7import omq.exception.RetryException;
    58
    69import org.apache.log4j.Logger;
     
    4952                int minMessages = settings.getMinNumQueued();
    5053
    51        
    5254                Channel channel = supervisor.getBroker().getChannel();
    5355                DeclareOk dok = channel.queueDeclarePassive(reference);
    5456
    55                 int numConsumers = dok.getConsumerCount();
     57                int numObjects = getNumObjects(reference);
    5658                int numMessages = dok.getMessageCount();
    5759
    58                 System.out.println("Num Consumers: " + numConsumers + ", num Messages: " + numMessages);
     60                System.out.println("Num Consumers: " + numObjects + ", num Messages: " + numMessages);
    5961
    60                 if (maxMessages < numMessages || numConsumers < minObjects) {
     62                if (maxMessages < numMessages || numObjects < minObjects) {
    6163                        logger.info("SPAWN TIME!!");
    62                         supervisor.spawnObject(settings);
    63                 } else if (numMessages < minMessages && minObjects < numConsumers) {
     64                        supervisor.spawnObject(settings, numObjects);
     65                } else if (numMessages < minMessages && minObjects < numObjects) {
    6466                        logger.info("Unbinding object!!!");
    65                         supervisor.unbindObject(settings);
     67                        supervisor.unbindObject(settings, numObjects);
    6668                }
    6769        }
     70
     71        private int getNumObjects(String reference) {
     72                int num = 0;
     73                for (RemoteBroker broker : supervisor.getBrokers()) {
     74                        try {
     75                                if (broker.hasObject(reference)) {
     76                                        num++;
     77                                }
     78                        } catch (RetryException e) {
     79                                e.printStackTrace();
     80                        }
     81                }
     82                return num;
     83        }
    6884}
  • branches/supervisor/src/test/java/omq/test/supervisor/SleepTest.java

    r102 r103  
    6363
    6464                OmqSettings settings = new OmqSettings("sleep", SleepImpl.class.getName(), env, 1, 20, 20);
    65                 supervisor.spawnObject(settings);
     65                supervisor.spawnObject(settings, 0);
    6666
    6767                Sleep sleep = broker.lookup("sleep", Sleep.class);
Note: See TracChangeset for help on using the changeset viewer.