Changeset 93


Ignore:
Timestamp:
10/01/13 15:34:32 (11 years ago)
Author:
stoda
Message:

TODO: unbind object

Location:
branches/supervisor/src
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/supervisor/Supervisor.java

    r92 r93  
    99        public void spawnObject(OmqSettings settings) throws Exception;
    1010
     11        public void unbindObject(OmqSettings settings) throws Exception;
     12
    1113}
  • branches/supervisor/src/main/java/omq/supervisor/SupervisorImpl.java

    r92 r93  
    4444
    4545                String reference = settings.getReference();
    46                 objectSettings.put(reference, settings);
     46
     47                if (!objectSettings.containsKey(reference)) {
     48                        objectSettings.put(reference, settings);
     49                }
    4750
    4851                int minObjects = settings.getMinNumberObjects();
     
    6972                }
    7073
     74        }
     75
     76        @Override
     77        public void unbindObject(OmqSettings settings) throws Exception {
     78                String reference = settings.getReference();
     79
     80                int minObjects = settings.getMinNumberObjects();
     81
     82                Channel channel = getBroker().getNewChannel();
     83
     84                int numObjects = 0;
     85                try {
     86                        DeclareOk dok = channel.queueDeclarePassive(reference);
     87                        numObjects = dok.getConsumerCount();
     88                        channel.close();
     89
     90                        for (RemoteBroker broker : brokers) {
     91                                if (broker.hasObject(reference) && (numObjects - 1) >= minObjects) {
     92                                        broker.deleteObject(reference);
     93                                }
     94                        }
     95
     96                } catch (Exception io) {
     97                        // The queue doesn't exist & the channel has been closed
     98                }
     99
     100                for (RemoteBroker broker : brokers) {
     101                        if (!broker.hasObject(reference) && minObjects >= numObjects) {
     102                                broker.spawnObject(reference, settings.getClassName(), settings.getProps());
     103                                numObjects++;
     104                                if (minObjects >= numObjects) {
     105                                        break;
     106                                }
     107                        }
     108                }
    71109        }
    72110
  • branches/supervisor/src/main/java/omq/supervisor/SupervisorThread.java

    r92 r93  
    6161                        // pregunta a tots i qui no té l'objecte li poses
    6262                } else if (numMessages < minMessages && minObjects > numConsumers) {
     63                        supervisor.unbindObject(settings);
    6364                        // delete:
    6465                        // pregunta a tots i qui té l'objecte li treus
  • branches/supervisor/src/test/java/omq/test/supervisor/SleepTest.java

    r92 r93  
    1515
    1616        @BeforeClass
    17         public static void Server() throws Exception {System.out.println("hola");
     17        public static void Server() throws Exception {
     18                System.out.println("hola");
    1819                Properties env1 = new Properties();
    1920                env1.setProperty(ParameterQueue.USER_NAME, "guest");
     
    5960                Supervisor supervisor = broker.lookup("supervisor", Supervisor.class);
    6061
    61                 OmqSettings settings = new OmqSettings("sleep", SleepImpl.class.getName(), env, 1, 70, 2);
     62                OmqSettings settings = new OmqSettings("sleep", SleepImpl.class.getName(), env, 20, 20, 2);
    6263                supervisor.spawnObject(settings);
    6364
    6465                Sleep sleep = broker.lookup("sleep", Sleep.class);
    6566
    66                 for (int i = 0; i < 50; i++) {
     67                for (int i = 0; i < 10; i++) {
    6768                        sleep.sleep();
    6869                }
    6970                Thread.sleep(5000);
    70                 for (int i = 0; i < 50; i++) {
     71                for (int i = 0; i < 20; i++) {
    7172                        sleep.sleep();
    7273                }
    73                
     74
    7475                Thread.sleep(100000);
    7576        }
Note: See TracChangeset for help on using the changeset viewer.