source: branches/supervisor/src/main/java/omq/common/broker/RemoteBrokerImpl.java @ 91

Last change on this file since 91 was 91, checked in by stoda, 11 years ago

Semaphores added and removed, ack error discovered and solutioned... Some tests added

Supervisor interface created and more things I'll do later...

TODO: supervisor!!

File size: 2.8 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.lang.reflect.Method;
5import java.util.ArrayList;
6import java.util.HashMap;
7import java.util.List;
8import java.util.Properties;
9import java.util.Set;
10
11import omq.common.util.ParameterQueue;
12import omq.exception.RemoteException;
13import omq.server.RemoteObject;
14import omq.server.RemoteWrapper;
15
16import com.rabbitmq.client.QueueingConsumer;
17
18/**
19 *
20 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
21 *
22 */
23public class RemoteBrokerImpl extends RemoteObject implements RemoteBroker {
24
25        /**
26         *
27         */
28        private static final long serialVersionUID = 1L;
29
30        // fanout broker
31        private String brokerSet;
32        // id broker
33        private String brokerName;
34
35        public void startRemoteBroker(String brokerSet, String brokerName, Broker broker, Properties env) throws Exception {
36                this.broker = broker;
37                this.UID = brokerName;
38                this.env = env;
39                this.brokerSet = brokerSet;
40                this.brokerName = brokerName;
41
42                this.params = new HashMap<String, List<Class<?>>>();
43                for (Method m : this.getClass().getMethods()) {
44                        List<Class<?>> list = new ArrayList<Class<?>>();
45                        for (Class<?> clazz : m.getParameterTypes()) {
46                                list.add(clazz);
47                        }
48                        this.params.put(m.getName(), list);
49                }
50
51                // Get num threads to use
52                int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
53                this.remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer());
54
55                startQueues();
56
57                // Start this listener
58                this.start();
59        }
60
61        private void startQueues() throws Exception {
62                /*
63                 * Unique queue
64                 */
65                channel.exchangeDeclare(brokerSet, "direct");
66                channel.queueDeclare(brokerName, false, true, true, null);
67                channel.queueBind(brokerName, brokerSet, brokerName);
68
69                /*
70                 * Multi queue
71                 */
72                channel.exchangeDeclare("multi#" + brokerSet, "fanout");
73                channel.queueDeclare("multi#" + brokerName, false, true, true, null);
74                channel.queueBind("multi#" + brokerName, "multi#" + brokerSet, "");
75
76                /*
77                 * Consumer
78                 */
79
80                consumer = new QueueingConsumer(channel);
81                channel.basicConsume(brokerName, true, consumer);
82                channel.basicConsume(brokerName + "#multi", true, consumer);
83        }
84
85        @Override
86        public Set<String> getRemoteObjects() {
87                return this.broker.getRemoteObjs().keySet();
88        }
89
90        @Override
91        public void spawnObject(String reference, String className, Class<?> parameterTypes, Object... args) throws Exception {
92                RemoteObject remote = (RemoteObject) Class.forName(className).getConstructor(parameterTypes).newInstance(args);
93                this.broker.bind(reference, remote);
94        }
95
96        @Override
97        public void deleteObject(String reference) throws RemoteException, IOException {
98                this.broker.unbind(reference);
99        }
100
101        @Override
102        public HasObject hasObject(String reference) {
103                boolean hasIt = this.broker.getRemoteObjs().containsKey(reference);
104                return new HasObject(this.brokerName, reference, hasIt);
105        }
106
107}
Note: See TracBrowser for help on using the repository browser.