source: trunk/src/main/java/omq/server/RemoteWrapper.java @ 53

Last change on this file since 53 was 53, checked in by stoda, 11 years ago

Non static broker
TODO: change all test to see whether the new broker configuration works

File size: 2.1 KB
Line 
1package omq.server;
2
3import java.util.ArrayList;
4import java.util.concurrent.BlockingQueue;
5import java.util.concurrent.LinkedBlockingDeque;
6
7import omq.common.util.Serializer;
8
9import org.apache.log4j.Logger;
10
11import com.rabbitmq.client.QueueingConsumer;
12import com.rabbitmq.client.QueueingConsumer.Delivery;
13
14/**
15 *
16 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
17 *
18 */
19public class RemoteWrapper {
20        private static final Logger logger = Logger.getLogger(RemoteWrapper.class.getName());
21
22        private RemoteObject obj;
23        private int numThreads;
24        private ArrayList<InvocationThread> invocationList;
25        private BlockingQueue<Delivery> deliveryQueue;
26
27        public RemoteWrapper(RemoteObject obj, int numThreads, Serializer serializer) {
28                this.obj = obj;
29                this.numThreads = numThreads;
30                invocationList = new ArrayList<InvocationThread>();
31                deliveryQueue = new LinkedBlockingDeque<QueueingConsumer.Delivery>();
32
33                logger.info("Object reference: " + obj.getRef() + ", numthreads listening = " + numThreads);
34
35                for (int i = 0; i < numThreads; i++) {
36                        InvocationThread thread = new InvocationThread(obj, deliveryQueue, serializer);
37                        invocationList.add(thread);
38                        thread.start();
39                }
40        }
41
42        public void notifyDelivery(Delivery delivery) throws Exception {
43                this.deliveryQueue.put(delivery);
44        }
45
46        public void stopRemoteWrapper() {
47                logger.warn("Stopping Invocation threads vinculed to " + obj.getRef());
48                for (InvocationThread thread : invocationList) {
49                        thread.interrupt();
50                }
51        }
52
53        public RemoteObject getObj() {
54                return obj;
55        }
56
57        public void setObj(RemoteObject obj) {
58                this.obj = obj;
59        }
60
61        public int getNumThreads() {
62                return numThreads;
63        }
64
65        public void setNumThreads(int numThreads) {
66                this.numThreads = numThreads;
67        }
68
69        public ArrayList<InvocationThread> getInvocationList() {
70                return invocationList;
71        }
72
73        public void setInvocationList(ArrayList<InvocationThread> invocationList) {
74                this.invocationList = invocationList;
75        }
76
77        public BlockingQueue<Delivery> getDeliveryQueue() {
78                return deliveryQueue;
79        }
80
81        public void setDeliveryQueue(BlockingQueue<Delivery> deliveryQueue) {
82                this.deliveryQueue = deliveryQueue;
83        }
84}
Note: See TracBrowser for help on using the repository browser.