source: branches/supervisor/src/main/java/omq/server/RemoteWrapper.java @ 92

Last change on this file since 92 was 91, checked in by stoda, 11 years ago

Semaphores added and removed, ack error discovered and solutioned... Some tests added

Supervisor interface created and more things I'll do later...

TODO: supervisor!!

File size: 3.3 KB
Line 
1package omq.server;
2
3import java.util.ArrayList;
4import java.util.concurrent.BlockingQueue;
5import java.util.concurrent.LinkedBlockingDeque;
6
7import omq.common.util.Serializer;
8
9import org.apache.log4j.Logger;
10
11import com.rabbitmq.client.QueueingConsumer;
12import com.rabbitmq.client.QueueingConsumer.Delivery;
13
14/**
15 * This class is used to encapsulate the invocationThreads under the
16 * RemoteObject.
17 *
18 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
19 *
20 */
21public class RemoteWrapper {
22        private static final Logger logger = Logger.getLogger(RemoteWrapper.class.getName());
23
24        private RemoteObject obj;
25        private int numThreads;
26        // private AtomicInteger busy;
27        private Object waitLock;
28        private ArrayList<InvocationThread> invocationList;
29        private BlockingQueue<Delivery> deliveryQueue;
30
31        public RemoteWrapper(RemoteObject obj, int numThreads, Serializer serializer) {
32                this.obj = obj;
33                this.numThreads = numThreads;
34                // this.busy = new AtomicInteger(0);
35                this.waitLock = new Object();
36                invocationList = new ArrayList<InvocationThread>();
37                deliveryQueue = new LinkedBlockingDeque<QueueingConsumer.Delivery>();
38
39                logger.info("Object reference: " + obj.getRef() + ", numthreads listening = " + numThreads);
40
41                for (int i = 0; i < numThreads; i++) {
42                        InvocationThread thread = new InvocationThread(obj, this, serializer);
43                        invocationList.add(thread);
44                        thread.start();
45                }
46        }
47
48        /**
49         * This method notifies a delivery to an invocationThread using a
50         * blockingQueue.
51         *
52         * @param delivery
53         *            - delivery which contains a Request to be invoked
54         * @throws Exception
55         */
56        public void notifyDelivery(Delivery delivery) throws Exception {
57
58                // // Ensure there is at least one thread available
59                // while (this.busy.get() == this.numThreads) {
60                // System.out.println("Waiting for a thread available");
61                // logger.debug("Object reference: " + obj.getRef() + " is busy");
62                //
63                // synchronized (waitLock) {
64                // waitLock.wait();
65                // }
66                // }
67                // Notify an available thread
68                this.deliveryQueue.put(delivery);
69
70        }
71
72        /**
73         * This method interrups all the invocationThreads under this remoteWrapper
74         */
75        public void stopRemoteWrapper() {
76                logger.warn("Stopping Invocation threads vinculed to " + obj.getRef());
77                for (InvocationThread thread : invocationList) {
78                        thread.interrupt();
79                }
80        }
81
82        // public int increaseBusy() {
83        // return this.busy.incrementAndGet();
84        // }
85        //
86        // public int decreaseBusy() {
87        // int value = this.busy.decrementAndGet();
88        // synchronized (waitLock) {
89        // waitLock.notifyAll();
90        // }
91        // return value;
92        // }
93
94        public RemoteObject getObj() {
95                return obj;
96        }
97
98        public void setObj(RemoteObject obj) {
99                this.obj = obj;
100        }
101
102        public int getNumThreads() {
103                return numThreads;
104        }
105
106        public void setNumThreads(int numThreads) {
107                this.numThreads = numThreads;
108        }
109
110        public ArrayList<InvocationThread> getInvocationList() {
111                return invocationList;
112        }
113
114        public void setInvocationList(ArrayList<InvocationThread> invocationList) {
115                this.invocationList = invocationList;
116        }
117
118        public BlockingQueue<Delivery> getDeliveryQueue() {
119                return deliveryQueue;
120        }
121
122        public void setDeliveryQueue(BlockingQueue<Delivery> deliveryQueue) {
123                this.deliveryQueue = deliveryQueue;
124        }
125
126        public Object getLock() {
127                return waitLock;
128        }
129
130        public void setLock(Object lock) {
131                this.waitLock = lock;
132        }
133}
Note: See TracBrowser for help on using the repository browser.