Changeset 93
- Timestamp:
- 10/01/13 15:34:32 (11 years ago)
- Location:
- branches/supervisor/src
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/src/main/java/omq/supervisor/Supervisor.java
r92 r93 9 9 public void spawnObject(OmqSettings settings) throws Exception; 10 10 11 public void unbindObject(OmqSettings settings) throws Exception; 12 11 13 } -
branches/supervisor/src/main/java/omq/supervisor/SupervisorImpl.java
r92 r93 44 44 45 45 String reference = settings.getReference(); 46 objectSettings.put(reference, settings); 46 47 if (!objectSettings.containsKey(reference)) { 48 objectSettings.put(reference, settings); 49 } 47 50 48 51 int minObjects = settings.getMinNumberObjects(); … … 69 72 } 70 73 74 } 75 76 @Override 77 public void unbindObject(OmqSettings settings) throws Exception { 78 String reference = settings.getReference(); 79 80 int minObjects = settings.getMinNumberObjects(); 81 82 Channel channel = getBroker().getNewChannel(); 83 84 int numObjects = 0; 85 try { 86 DeclareOk dok = channel.queueDeclarePassive(reference); 87 numObjects = dok.getConsumerCount(); 88 channel.close(); 89 90 for (RemoteBroker broker : brokers) { 91 if (broker.hasObject(reference) && (numObjects - 1) >= minObjects) { 92 broker.deleteObject(reference); 93 } 94 } 95 96 } catch (Exception io) { 97 // The queue doesn't exist & the channel has been closed 98 } 99 100 for (RemoteBroker broker : brokers) { 101 if (!broker.hasObject(reference) && minObjects >= numObjects) { 102 broker.spawnObject(reference, settings.getClassName(), settings.getProps()); 103 numObjects++; 104 if (minObjects >= numObjects) { 105 break; 106 } 107 } 108 } 71 109 } 72 110 -
branches/supervisor/src/main/java/omq/supervisor/SupervisorThread.java
r92 r93 61 61 // pregunta a tots i qui no té l'objecte li poses 62 62 } else if (numMessages < minMessages && minObjects > numConsumers) { 63 supervisor.unbindObject(settings); 63 64 // delete: 64 65 // pregunta a tots i qui té l'objecte li treus -
branches/supervisor/src/test/java/omq/test/supervisor/SleepTest.java
r92 r93 15 15 16 16 @BeforeClass 17 public static void Server() throws Exception {System.out.println("hola"); 17 public static void Server() throws Exception { 18 System.out.println("hola"); 18 19 Properties env1 = new Properties(); 19 20 env1.setProperty(ParameterQueue.USER_NAME, "guest"); … … 59 60 Supervisor supervisor = broker.lookup("supervisor", Supervisor.class); 60 61 61 OmqSettings settings = new OmqSettings("sleep", SleepImpl.class.getName(), env, 1, 70, 2);62 OmqSettings settings = new OmqSettings("sleep", SleepImpl.class.getName(), env, 20, 20, 2); 62 63 supervisor.spawnObject(settings); 63 64 64 65 Sleep sleep = broker.lookup("sleep", Sleep.class); 65 66 66 for (int i = 0; i < 50; i++) {67 for (int i = 0; i < 10; i++) { 67 68 sleep.sleep(); 68 69 } 69 70 Thread.sleep(5000); 70 for (int i = 0; i < 50; i++) {71 for (int i = 0; i < 20; i++) { 71 72 sleep.sleep(); 72 73 } 73 74 74 75 Thread.sleep(100000); 75 76 }
Note: See TracChangeset
for help on using the changeset viewer.