Ignore:
Timestamp:
09/27/13 17:50:41 (11 years ago)
Author:
stoda
Message:

Semaphores added and removed, ack error discovered and solutioned... Some tests added

Supervisor interface created and more things I'll do later...

TODO: supervisor!!

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/server/RemoteWrapper.java

    r83 r91  
    2424        private RemoteObject obj;
    2525        private int numThreads;
     26        // private AtomicInteger busy;
     27        private Object waitLock;
    2628        private ArrayList<InvocationThread> invocationList;
    2729        private BlockingQueue<Delivery> deliveryQueue;
     
    3032                this.obj = obj;
    3133                this.numThreads = numThreads;
     34                // this.busy = new AtomicInteger(0);
     35                this.waitLock = new Object();
    3236                invocationList = new ArrayList<InvocationThread>();
    3337                deliveryQueue = new LinkedBlockingDeque<QueueingConsumer.Delivery>();
     
    3640
    3741                for (int i = 0; i < numThreads; i++) {
    38                         InvocationThread thread = new InvocationThread(obj, deliveryQueue, serializer);
     42                        InvocationThread thread = new InvocationThread(obj, this, serializer);
    3943                        invocationList.add(thread);
    4044                        thread.start();
     
    5155         */
    5256        public void notifyDelivery(Delivery delivery) throws Exception {
     57
     58                // // Ensure there is at least one thread available
     59                // while (this.busy.get() == this.numThreads) {
     60                // System.out.println("Waiting for a thread available");
     61                // logger.debug("Object reference: " + obj.getRef() + " is busy");
     62                //
     63                // synchronized (waitLock) {
     64                // waitLock.wait();
     65                // }
     66                // }
     67                // Notify an available thread
    5368                this.deliveryQueue.put(delivery);
     69
    5470        }
    5571
     
    6379                }
    6480        }
     81
     82        // public int increaseBusy() {
     83        // return this.busy.incrementAndGet();
     84        // }
     85        //
     86        // public int decreaseBusy() {
     87        // int value = this.busy.decrementAndGet();
     88        // synchronized (waitLock) {
     89        // waitLock.notifyAll();
     90        // }
     91        // return value;
     92        // }
    6593
    6694        public RemoteObject getObj() {
     
    95123                this.deliveryQueue = deliveryQueue;
    96124        }
     125
     126        public Object getLock() {
     127                return waitLock;
     128        }
     129
     130        public void setLock(Object lock) {
     131                this.waitLock = lock;
     132        }
    97133}
Note: See TracChangeset for help on using the changeset viewer.