Changeset 91
- Timestamp:
- 09/27/13 17:50:41 (11 years ago)
- Location:
- branches/supervisor
- Files:
-
- 12 added
- 1 deleted
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/src/main/java/omq/common/broker/Broker.java
r83 r91 49 49 private boolean connectionClosed = false; 50 50 private Properties environment = null; 51 private RemoteBrokerImpl remoteBrokerImpl; 51 52 private Map<String, RemoteObject> remoteObjs; 52 53 private Map<String, Object> proxies = new Hashtable<String, Object>(); … … 357 358 } 358 359 360 public void setSupervisor(String brokerSet, String brokerName) throws Exception { 361 remoteBrokerImpl = new RemoteBrokerImpl(); 362 remoteBrokerImpl.startRemoteBroker(brokerSet, brokerName, this, getEnvironment()); 363 } 364 359 365 public Properties getEnvironment() { 360 366 return environment; … … 368 374 return serializer; 369 375 } 376 377 public Map<String, RemoteObject> getRemoteObjs() { 378 return remoteObjs; 379 } 380 370 381 } -
branches/supervisor/src/main/java/omq/server/InvocationThread.java
r83 r91 24 24 private static final Logger logger = Logger.getLogger(InvocationThread.class.getName()); 25 25 private RemoteObject obj; 26 private transient Serializer serializer; 26 private Serializer serializer; 27 // private RemoteWrapper wrapper; 27 28 private BlockingQueue<Delivery> deliveryQueue; 28 29 private boolean killed = false; 29 30 30 public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue, Serializer serializer) {31 public InvocationThread(RemoteObject obj, RemoteWrapper wrapper, Serializer serializer) { 31 32 this.obj = obj; 32 this.deliveryQueue = deliveryQueue; 33 // this.wrapper = wrapper; 34 this.deliveryQueue = wrapper.getDeliveryQueue(); 33 35 this.serializer = serializer; 34 36 } … … 40 42 // Get the delivery 41 43 Delivery delivery = deliveryQueue.take(); 44 45 // // Indicate this thread is not available 46 // wrapper.increaseBusy(); 42 47 43 48 String serializerType = delivery.getProperties().getType(); … … 64 69 } 65 70 71 72 Channel channel = obj.getChannel(); 73 74 66 75 // Reply if it's necessary 67 76 if (!request.isAsync()) { 68 77 Response resp = new Response(request.getId(), obj.getRef(), result, error); 69 78 70 Channel channel = obj.getChannel();79 71 80 72 81 BasicProperties props = delivery.getProperties(); … … 79 88 + props.getReplyTo()); 80 89 } 81 90 91 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 92 93 // // Indicate this thread is available 94 // wrapper.decreaseBusy(); 82 95 } catch (InterruptedException i) { 83 96 logger.error(i); -
branches/supervisor/src/main/java/omq/server/RemoteObject.java
r84 r91 38 38 private static final Logger logger = Logger.getLogger(RemoteObject.class.getName()); 39 39 40 pr ivateString UID;41 pr ivateProperties env;42 pr ivatetransient Broker broker;43 pr ivatetransient String multiQueue;44 pr ivatetransient RemoteWrapper remoteWrapper;45 pr ivatetransient Map<String, List<Class<?>>> params;46 pr ivatetransient Channel channel;47 pr ivatetransient QueueingConsumer consumer;48 pr ivatetransient boolean killed = false;40 protected String UID; 41 protected Properties env; 42 protected transient Broker broker; 43 protected transient String multiQueue; 44 protected transient RemoteWrapper remoteWrapper; 45 protected transient Map<String, List<Class<?>>> params; 46 protected transient Channel channel; 47 protected transient QueueingConsumer consumer; 48 protected transient boolean killed = false; 49 49 50 50 private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>(); … … 317 317 */ 318 318 319 boolean autoAck = false; 320 319 321 // Declare a new consumer 320 322 consumer = new QueueingConsumer(channel); 321 channel.basicConsume(queue, true, consumer);322 channel.basicConsume(multiQueue, true, consumer);323 channel.basicConsume(queue, autoAck, consumer); 324 channel.basicConsume(multiQueue, autoAck, consumer); 323 325 } 324 326 -
branches/supervisor/src/main/java/omq/server/RemoteWrapper.java
r83 r91 24 24 private RemoteObject obj; 25 25 private int numThreads; 26 // private AtomicInteger busy; 27 private Object waitLock; 26 28 private ArrayList<InvocationThread> invocationList; 27 29 private BlockingQueue<Delivery> deliveryQueue; … … 30 32 this.obj = obj; 31 33 this.numThreads = numThreads; 34 // this.busy = new AtomicInteger(0); 35 this.waitLock = new Object(); 32 36 invocationList = new ArrayList<InvocationThread>(); 33 37 deliveryQueue = new LinkedBlockingDeque<QueueingConsumer.Delivery>(); … … 36 40 37 41 for (int i = 0; i < numThreads; i++) { 38 InvocationThread thread = new InvocationThread(obj, deliveryQueue, serializer);42 InvocationThread thread = new InvocationThread(obj, this, serializer); 39 43 invocationList.add(thread); 40 44 thread.start(); … … 51 55 */ 52 56 public void notifyDelivery(Delivery delivery) throws Exception { 57 58 // // Ensure there is at least one thread available 59 // while (this.busy.get() == this.numThreads) { 60 // System.out.println("Waiting for a thread available"); 61 // logger.debug("Object reference: " + obj.getRef() + " is busy"); 62 // 63 // synchronized (waitLock) { 64 // waitLock.wait(); 65 // } 66 // } 67 // Notify an available thread 53 68 this.deliveryQueue.put(delivery); 69 54 70 } 55 71 … … 63 79 } 64 80 } 81 82 // public int increaseBusy() { 83 // return this.busy.incrementAndGet(); 84 // } 85 // 86 // public int decreaseBusy() { 87 // int value = this.busy.decrementAndGet(); 88 // synchronized (waitLock) { 89 // waitLock.notifyAll(); 90 // } 91 // return value; 92 // } 65 93 66 94 public RemoteObject getObj() { … … 95 123 this.deliveryQueue = deliveryQueue; 96 124 } 125 126 public Object getLock() { 127 return waitLock; 128 } 129 130 public void setLock(Object lock) { 131 this.waitLock = lock; 132 } 97 133 }
Note: See TracChangeset
for help on using the changeset viewer.