Ignore:
Timestamp:
10/15/13 13:21:33 (11 years ago)
Author:
stoda
Message:

Supervisor done

TODO: revise supervisor

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/supervisor/SupervisorImpl.java

    r94 r103  
    1010
    1111import org.apache.log4j.Logger;
    12 
    13 import com.rabbitmq.client.AMQP.Queue.DeclareOk;
    14 import com.rabbitmq.client.Channel;
    1512
    1613public class SupervisorImpl extends RemoteObject implements Supervisor {
     
    4138
    4239        @Override
    43         public void spawnObject(OmqSettings settings) throws Exception {
     40        public void spawnObject(OmqSettings settings, int numObjects) throws Exception {
    4441
    4542                String reference = settings.getReference();
     
    5047
    5148                int minObjects = settings.getMinNumberObjects();
    52 
    53                 Channel channel = getBroker().getNewChannel();
    54 
    55                 int numObjects = 0;
    56                 try {
    57                         DeclareOk dok = channel.queueDeclarePassive(reference);
    58                         numObjects = dok.getConsumerCount();
    59                         channel.close();
    60                 } catch (Exception io) {
    61                         // The queue doesn't exist & the channel has been closed
    62                 }
    6349
    6450                for (RemoteBroker broker : brokers) {
     
    7561
    7662        @Override
    77         public void unbindObject(OmqSettings settings) throws Exception {
     63        public void unbindObject(OmqSettings settings, int numObjects) throws Exception {
    7864                String reference = settings.getReference();
    7965
    8066                int minObjects = settings.getMinNumberObjects();
    8167
    82                 Channel channel = getBroker().getNewChannel();
     68                for (RemoteBroker broker : brokers) {
     69                        if (broker.hasObject(reference) && (numObjects - 1) >= minObjects) {
     70                                broker.deleteObject(reference);
     71                                break;
     72                        }
     73                }
    8374
    84                 int numObjects = 0;
    85                 try {
    86                         DeclareOk dok = channel.queueDeclarePassive(reference);
    87                         numObjects = dok.getConsumerCount();
    88                         channel.close();
    89 
    90                         for (RemoteBroker broker : brokers) {
    91                                 if (broker.hasObject(reference) && (numObjects - 1) >= minObjects) {
    92                                         broker.deleteObject(reference);
    93                                         break;
    94                                 }
    95                         }
    96 
    97                 } catch (Exception io) {
    98                         // The queue doesn't exist & the channel has been closed
    99                 }
    10075        }
    10176
Note: See TracChangeset for help on using the changeset viewer.