Changeset 106
- Timestamp:
- 10/18/13 16:09:36 (11 years ago)
- Location:
- branches/supervisor
- Files:
-
- 1 added
- 3 deleted
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/src/main/java/omq/common/broker/Broker.java
r99 r106 240 240 } 241 241 242 public void bind(String reference, String UID, RemoteObject remote) throws RemoteException, AlreadyBoundException { 243 bind(reference, UID, remote, environment); 244 } 245 242 246 /** 243 247 * Binds the reference to the specified remote object. This function uses … … 264 268 try { 265 269 remote.startRemoteObject(reference, this, env); 270 remoteObjs.put(reference, remote); 271 } catch (Exception e) { 272 throw new RemoteException(e); 273 } 274 } 275 276 public void bind(String reference, String UID, RemoteObject remote, Properties env) throws RemoteException, AlreadyBoundException { 277 if (remoteObjs.containsKey(reference)) { 278 throw new AlreadyBoundException(reference); 279 } 280 // Try to start the remtoeObject listeners 281 try { 282 remote.startRemoteObject(reference, UID, this, env); 266 283 remoteObjs.put(reference, remote); 267 284 } catch (Exception e) { … … 400 417 * Supervisor 401 418 */ 402 public void setSupervisor(String supervisorName, String broker Name) throws Exception {419 public void setSupervisor(String supervisorName, String brokerSet, String brokerName) throws Exception { 403 420 // Create a RemoteBrokerImpl 404 bind(broker Name, new RemoteBrokerImpl());421 bind(brokerSet, brokerName, new RemoteBrokerImpl()); 405 422 // Subscribe broker 406 423 supervisor = lookup(supervisorName, Supervisor.class); 407 supervisor.subscribe(broker Name);408 logger.info("Supervisor set: " + supervisorName + ", Broker Name: " + brokerName);424 supervisor.subscribe(brokerSet, brokerName); 425 logger.info("Supervisor set: " + supervisorName + ", BrokerSet: " + brokerSet + ", BrokerName: " + brokerName); 409 426 } 410 427 -
branches/supervisor/src/main/java/omq/server/InvocationThread.java
r105 r106 36 36 private RemoteObject obj; 37 37 private String reference; 38 private String UID; 38 39 private Properties env; 39 40 private boolean idle; … … 54 55 public InvocationThread(RemoteObject obj) throws Exception { 55 56 this.obj = obj; 57 this.UID = obj.getUID(); 56 58 this.reference = obj.getRef(); 57 59 this.env = obj.getEnv(); … … 87 89 String serializerType = delivery.getProperties().getType(); 88 90 89 // Deserialize the json91 // Deserialize the request 90 92 Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj); 91 93 String methodName = request.getMethod(); … … 196 198 197 199 /* 200 * UID queue 201 */ 202 203 if (UID != null) { 204 205 boolean uidDurable = false; 206 boolean uidExclusive = true; 207 boolean uidAutoDelete = true; 208 209 channel.queueDeclare(UID, uidDurable, uidExclusive, uidAutoDelete, null); 210 if (!exchange.equalsIgnoreCase("")) { // Default exchange case 211 channel.queueBind(UID, exchange, UID); 212 } 213 } 214 215 /* 198 216 * Multi queue, exclusive per each instance 199 217 */ … … 233 251 channel.basicConsume(queue, autoAck, consumer); 234 252 channel.basicConsume(multiQueue, autoAck, consumer); 253 if (UID != null) { 254 channel.basicConsume(UID, autoAck, consumer); 255 } 235 256 } 236 257 -
branches/supervisor/src/main/java/omq/server/RemoteObject.java
r105 r106 31 31 32 32 private String reference; 33 private String UID; 33 34 private Properties env; 34 35 private transient Broker broker; … … 83 84 pool = new RemoteThreadPool(minPoolThreads, maxPoolThreads, refresh, keepAliveTime, maxMessagesPerThread, this, broker); 84 85 pool.start(); 86 } 87 88 public void startRemoteObject(String reference, String UID, Broker broker, Properties env) throws Exception { 89 this.UID = UID; 90 startRemoteObject(reference, broker, env); 85 91 } 86 92 … … 209 215 } 210 216 217 public String getUID() { 218 return UID; 219 } 220 221 public void setUID(String uID) { 222 UID = uID; 223 } 224 211 225 } -
branches/supervisor/src/main/java/omq/supervisor/Supervisor.java
r103 r106 2 2 3 3 import omq.Remote; 4 import omq.common.broker.HasObject; 4 5 5 6 public interface Supervisor extends Remote { 6 7 7 public void subscribe(String broker Name) throws Exception;8 public void subscribe(String brokerSet, String brokerName) throws Exception; 8 9 9 public void spawnObject(OmqSettings settings , int numObjects) throws Exception;10 public void spawnObject(OmqSettings settings) throws Exception; 10 11 11 public void unbindObject(OmqSettings settings, int numObjects) throws Exception; 12 public void spawnObject(OmqSettings settings, HasObject[] hasList, int numObjects) throws Exception; 13 14 public void unbindObject(OmqSettings settings, HasObject[] hasList, int numObjects) throws Exception; 12 15 13 16 } -
branches/supervisor/src/main/java/omq/supervisor/SupervisorImpl.java
r103 r106 5 5 import java.util.List; 6 6 import java.util.Map; 7 import java.util.Set; 7 8 9 import omq.common.broker.HasObject; 8 10 import omq.common.broker.RemoteBroker; 11 import omq.common.broker.RemoteMultiBroker; 12 import omq.exception.RemoteException; 9 13 import omq.server.RemoteObject; 10 14 11 15 import org.apache.log4j.Logger; 12 16 13 public class SupervisorImpl extends RemoteObject implements Supervisor { 17 import com.rabbitmq.client.Channel; 18 import com.rabbitmq.client.AMQP.Queue.DeclareOk; 19 20 public class SupervisorImpl extends RemoteObject implements Supervisor, Runnable { 14 21 15 22 /** … … 18 25 private static final long serialVersionUID = 1L; 19 26 private static final Logger logger = Logger.getLogger(SupervisorImpl.class.getName()); 20 private SupervisorThread thread; 27 28 private String brokerSet; 29 private long sleep; 21 30 private Map<String, OmqSettings> objectSettings; 22 31 // TODO: Set<?> 32 private RemoteMultiBroker multiBroker; 33 private Map<String, RemoteBroker> brokerMap; 23 34 private List<RemoteBroker> brokers; 24 35 25 public SupervisorImpl(long sleep) { 36 public SupervisorImpl(String brokerSet, long sleep) { 37 this.brokerSet = brokerSet; 38 this.sleep = sleep; 26 39 brokers = new ArrayList<RemoteBroker>(); 27 40 objectSettings = new HashMap<String, OmqSettings>(); 28 thread = new SupervisorThread(this, sleep);29 thread.start();30 41 } 31 42 32 43 @Override 33 public void subscribe(String brokerName) throws Exception { 34 logger.info("Broker " + brokerName + " subscrived"); 35 RemoteBroker broker = getBroker().lookup(brokerName, RemoteBroker.class); 36 brokers.add(broker); 44 public void run() { 45 try { 46 multiBroker = getBroker().lookup(brokerSet, RemoteMultiBroker.class); 47 while (true) { 48 try { 49 Set<String> keys = objectSettings.keySet(); 50 for (String reference : keys) { 51 System.out.println("key = " + keys); 52 checkObject(reference); 53 } 54 } catch (Exception e) { 55 e.printStackTrace(); 56 } 57 58 try { 59 Thread.sleep(sleep); 60 } catch (InterruptedException e) { 61 e.printStackTrace(); 62 } 63 } 64 } catch (RemoteException e1) { 65 // TODO Auto-generated catch block 66 e1.printStackTrace(); 67 } 37 68 } 38 69 39 70 @Override 40 public void spawnObject(OmqSettings settings, int numObjects) throws Exception { 71 public void subscribe(String brokerSet, String brokerName) throws Exception { 72 if (brokerSet.equals(brokerSet) && !brokerMap.containsKey(brokerName)) { 73 logger.info("Broker " + brokerName + " subscrived"); 74 RemoteBroker broker = getBroker().lookup(brokerName, RemoteBroker.class); 75 brokerMap.put(brokerSet, broker); 76 } else { 77 throw new Exception("blablabla"); 78 } 79 } 80 81 @Override 82 public void spawnObject(OmqSettings settings) throws Exception { 83 String reference = settings.getReference(); 84 85 if (objectSettings.containsKey(reference)) { 86 throw new Exception("JAJAJAJAJA"); 87 } 88 objectSettings.put(reference, settings); 89 } 90 91 @Override 92 public void spawnObject(OmqSettings settings, HasObject[] hasList, int numObjects) throws Exception { 41 93 42 94 String reference = settings.getReference(); … … 48 100 int minObjects = settings.getMinNumberObjects(); 49 101 50 for ( RemoteBroker broker : brokers) {51 if ( !broker.hasObject(reference) && minObjects >= numObjects) {52 broker .spawnObject(reference, settings.getClassName(), settings.getProps());102 for (HasObject h : hasList) { 103 if (h.hasObject() && minObjects >= numObjects) { 104 brokerMap.get(h.getBrokerName()).spawnObject(reference, settings.getClassName(), settings.getProps()); 53 105 numObjects++; 54 106 if (minObjects >= numObjects) { … … 61 113 62 114 @Override 63 public void unbindObject(OmqSettings settings, int numObjects) throws Exception {115 public void unbindObject(OmqSettings settings, HasObject[] hasList, int numObjects) throws Exception { 64 116 String reference = settings.getReference(); 65 117 … … 75 127 } 76 128 77 public SupervisorThread getThread() { 78 return thread; 79 } 129 private void checkObject(String reference) throws Exception { 130 OmqSettings settings = objectSettings.get(reference); 80 131 81 public void setThread(SupervisorThread thread) { 82 this.thread = thread; 132 int minObjects = settings.getMinNumberObjects(); 133 int maxMessages = settings.getMaxNumQueued(); 134 int minMessages = settings.getMinNumQueued(); 135 136 Channel channel = getBroker().getChannel(); 137 DeclareOk dok = channel.queueDeclarePassive(reference); 138 139 int numObjects = 0; 140 int numMessages = dok.getMessageCount(); 141 142 HasObject[] hasList = multiBroker.hasObjectInfo(reference); 143 for (HasObject h : hasList) { 144 if (h.hasObject()) { 145 numObjects++; 146 } 147 } 148 149 System.out.println("Num Consumers: " + numObjects + ", num Messages: " + numMessages); 150 151 if (maxMessages < numMessages || numObjects < minObjects) { 152 logger.info("SPAWN TIME!!"); 153 spawnObject(settings, hasList, numObjects); 154 } else if (numMessages < minMessages && minObjects < numObjects) { 155 logger.info("Unbinding object!!!"); 156 unbindObject(settings, hasList, numObjects); 157 } 83 158 } 84 159 … … 99 174 } 100 175 176 public String getBrokerSet() { 177 return brokerSet; 178 } 179 180 public void setBrokerSet(String brokerSet) { 181 this.brokerSet = brokerSet; 182 } 183 184 public RemoteMultiBroker getMultiBroker() { 185 return multiBroker; 186 } 187 188 public void setMultiBroker(RemoteMultiBroker multiBroker) { 189 this.multiBroker = multiBroker; 190 } 191 101 192 } -
branches/supervisor/src/main/resources/log4j.xml
r86 r106 4 4 <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> 5 5 <appender name="consoleAppender" class="org.apache.log4j.ConsoleAppender"> 6 <param name="Threshold" value=" INFO" />6 <param name="Threshold" value="DEBUG" /> 7 7 <layout class="org.apache.log4j.PatternLayout"> 8 8 <param name="ConversionPattern" value="%d{[yyyy-MM-dd HH:mm:ss]} %-5p %c:%L - %m%n" /> … … 30 30 31 31 <root> 32 <priority value =" INFO" />32 <priority value ="DEBUG" /> 33 33 <appender-ref ref="consoleAppender" /> 34 34 <appender-ref ref="A2" /> -
branches/supervisor/src/test/java/omq/test/supervisor/SleepTest.java
r103 r106 13 13 14 14 public class SleepTest { 15 private static String brokerSet = "broker"; 15 16 16 17 @BeforeClass … … 42 43 43 44 // Set supervisor 44 SupervisorImpl supervisor = new SupervisorImpl( 500);45 SupervisorImpl supervisor = new SupervisorImpl(brokerSet, 500); 45 46 broker.bind("supervisor", supervisor); 47 Thread t = new Thread(supervisor); 48 t.start(); 46 49 47 broker.setSupervisor("supervisor", "b1");48 broker2.setSupervisor("supervisor", "b2");50 broker.setSupervisor("supervisor", brokerSet, "b1"); 51 broker2.setSupervisor("supervisor", brokerSet, "b2"); 49 52 } 50 53 … … 63 66 64 67 OmqSettings settings = new OmqSettings("sleep", SleepImpl.class.getName(), env, 1, 20, 20); 65 supervisor.spawnObject(settings , 0);68 supervisor.spawnObject(settings); 66 69 67 70 Sleep sleep = broker.lookup("sleep", Sleep.class);
Note: See TracChangeset
for help on using the changeset viewer.