source: branches/supervisor/src/main/java/omq/common/broker/Broker.java @ 91

Last change on this file since 91 was 91, checked in by stoda, 11 years ago

Semaphores added and removed, ack error discovered and solutioned... Some tests added

Supervisor interface created and more things I'll do later...

TODO: supervisor!!

File size: 10.4 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.lang.reflect.Proxy;
5import java.net.URL;
6import java.util.HashMap;
7import java.util.Hashtable;
8import java.util.Map;
9import java.util.Properties;
10
11import omq.Remote;
12import omq.client.listener.ResponseListener;
13import omq.client.proxy.MultiProxymq;
14import omq.client.proxy.Proxymq;
15import omq.common.util.OmqConnectionFactory;
16import omq.common.util.ParameterQueue;
17import omq.common.util.Serializer;
18import omq.exception.AlreadyBoundException;
19import omq.exception.InitBrokerException;
20import omq.exception.RemoteException;
21import omq.server.RemoteObject;
22
23import org.apache.log4j.Logger;
24import org.apache.log4j.xml.DOMConfigurator;
25
26import com.rabbitmq.client.Channel;
27import com.rabbitmq.client.Connection;
28import com.rabbitmq.client.QueueingConsumer;
29import com.rabbitmq.client.QueueingConsumer.Delivery;
30import com.rabbitmq.client.ShutdownListener;
31import com.rabbitmq.client.ShutdownSignalException;
32
33/**
34 * A "broker" allows a new connection to a RabbitMQ server. Under this
35 * connection it can have binded object and proxies.
36 *
37 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
38 *
39 */
40public class Broker {
41
42        private static final Logger logger = Logger.getLogger(Broker.class.getName());
43
44        private Connection connection;
45        private Channel channel;
46        private ResponseListener responseListener;
47        private Serializer serializer;
48        private boolean clientStarted = false;
49        private boolean connectionClosed = false;
50        private Properties environment = null;
51        private RemoteBrokerImpl remoteBrokerImpl;
52        private Map<String, RemoteObject> remoteObjs;
53        private Map<String, Object> proxies = new Hashtable<String, Object>();
54        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
55
56        public Broker(Properties env) throws Exception {
57                // Load log4j configuration
58                URL log4jResource = Broker.class.getResource("/log4j.xml");
59                DOMConfigurator.configure(log4jResource);
60
61                remoteObjs = new HashMap<String, RemoteObject>();
62                serializer = new Serializer(env);
63                environment = env;
64                connection = OmqConnectionFactory.getNewConnection(env);
65                channel = connection.createChannel();
66                addFaultTolerance();
67                try {
68                        tryConnection(env);
69                } catch (Exception e) {
70                        channel.close();
71                        connection.close();
72                        throw new InitBrokerException("The connection didn't work");
73                }
74        }
75
76        /**
77         * This method stops the broker's connection and all the threads created
78         *
79         * @throws Exception
80         */
81        public void stopBroker() throws Exception {
82                logger.warn("Stopping broker");
83                // Stop the client
84                if (clientStarted) {
85                        responseListener.kill();
86                        // TODO proxies = null; ??
87                }
88                // Stop all the remote objects working
89                for (String reference : remoteObjs.keySet()) {
90                        unbind(reference);
91                }
92
93                // Close the connection once all the listeners are died
94                closeConnection();
95
96                clientStarted = false;
97                connectionClosed = false;
98                environment = null;
99                remoteObjs = null;
100        }
101
102        /**
103         * @return Broker's connection
104         * @throws Exception
105         */
106        public Connection getConnection() throws Exception {
107                return connection;
108        }
109
110        /**
111         * This method close the broker's connection
112         *
113         * @throws IOException
114         */
115        public void closeConnection() throws IOException {
116                logger.warn("Clossing connection");
117                connectionClosed = true;
118                connection.close();
119                connectionClosed = false;
120        }
121
122        /**
123         * Return the broker's channel
124         *
125         * @return Broker's channel
126         * @throws Exception
127         */
128        public Channel getChannel() throws Exception {
129                return channel;
130        }
131
132        /**
133         * Creates a new channel using the Broker's connection
134         *
135         * @return newChannel
136         * @throws IOException
137         */
138        public Channel getNewChannel() throws IOException {
139                return connection.createChannel();
140        }
141
142        /**
143         * Returns the remote object for specified reference.
144         *
145         * @param reference
146         *            - Binding name
147         * @param contract
148         *            - Remote Interface
149         * @return newProxy
150         * @throws RemoteException
151         */
152        @SuppressWarnings("unchecked")
153        public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
154                try {
155
156                        if (!clientStarted) {
157                                initClient();
158                        }
159
160                        if (!proxies.containsKey(reference)) {
161                                Proxymq proxy = new Proxymq(reference, contract, this);
162                                Class<?>[] array = { contract };
163                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
164                                proxies.put(reference, newProxy);
165                                return (T) newProxy;
166                        }
167                        return (T) proxies.get(reference);
168
169                } catch (Exception e) {
170                        throw new RemoteException(e);
171                }
172        }
173
174        /**
175         * Returns the remote object for specified reference. This function returns
176         * an special type of proxy, every method invoked will be multi and
177         * asynchronous.
178         *
179         * @param reference
180         *            - Binding name
181         * @param contract
182         *            - Remote Interface
183         * @return newProxy
184         * @throws RemoteException
185         */
186        @SuppressWarnings("unchecked")
187        public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException {
188                try {
189                        if (!multiProxies.containsKey(reference)) {
190                                MultiProxymq proxy = new MultiProxymq(reference, contract, this);
191                                Class<?>[] array = { contract };
192                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
193                                multiProxies.put(reference, newProxy);
194                                return (T) newProxy;
195                        }
196                        return (T) multiProxies.get(reference);
197
198                } catch (Exception e) {
199                        throw new RemoteException(e);
200                }
201        }
202
203        /**
204         * Binds the reference to the specified remote object. This function uses
205         * the broker's environment
206         *
207         * @param reference
208         *            - Binding name
209         * @param remote
210         *            - RemoteObject to bind
211         * @throws RemoteException
212         *             If the remote operation failed
213         * @throws AlreadyBoundException
214         *             If name is already bound.
215         */
216        public void bind(String reference, RemoteObject remote) throws RemoteException, AlreadyBoundException {
217                bind(reference, remote, environment);
218        }
219
220        /**
221         * Binds the reference to the specified remote object. This function uses
222         * the broker's environment
223         *
224         * @param reference
225         *            - Binding name
226         * @param remote
227         *            - RemoteObject to bind
228         * @param env
229         *            - RemoteObject environment. You can set how many threads will
230         *            be listen to the reference, the multiqueue name and the
231         *            properties of the object queue and multiqueue
232         * @throws RemoteException
233         *             If the remote operation failed
234         * @throws AlreadyBoundException
235         *             If name is already bound.
236         */
237        public void bind(String reference, RemoteObject remote, Properties env) throws RemoteException, AlreadyBoundException {
238                if (remoteObjs.containsKey(reference)) {
239                        throw new AlreadyBoundException(reference);
240                }
241                // Try to start the remtoeObject listeners
242                try {
243                        remote.startRemoteObject(reference, this, env);
244                        remoteObjs.put(reference, remote);
245                } catch (Exception e) {
246                        throw new RemoteException(e);
247                }
248        }
249
250        /**
251         * Unbinds a remoteObject from its reference and kills all the threads
252         * created.
253         *
254         * @param reference
255         *            - Binding name
256         * @throws RemoteException
257         *             If the remote operation failed
258         * @throws IOException
259         *             If there are problems while killing the threads
260         */
261        public void unbind(String reference) throws RemoteException, IOException {
262                if (remoteObjs.containsKey(reference)) {
263                        RemoteObject remote = remoteObjs.get(reference);
264                        remote.kill();
265                } else {
266                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
267                }
268
269        }
270
271        /**
272         * This method ensures the client will have only one ResponseListener.
273         *
274         * @throws Exception
275         */
276        private synchronized void initClient() throws Exception {
277                if (responseListener == null) {
278                        responseListener = new ResponseListener(this);
279                        responseListener.start();
280                        clientStarted = true;
281                }
282        }
283
284        /**
285         * This function is used to send a ping message to see if the connection
286         * works
287         *
288         * @param env
289         * @throws Exception
290         */
291        public void tryConnection(Properties env) throws Exception {
292                Channel channel = connection.createChannel();
293                String message = "ping";
294
295                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
296                String queueName = exchange;
297                String routingKey = "routingKey";
298
299                channel.exchangeDeclare(exchange, "direct");
300                channel.queueDeclare(queueName, false, false, false, null);
301                channel.queueBind(queueName, exchange, routingKey);
302
303                channel.basicPublish(exchange, routingKey, null, message.getBytes());
304
305                QueueingConsumer consumer = new QueueingConsumer(channel);
306
307                channel.basicConsume(queueName, true, consumer);
308                Delivery delivery = consumer.nextDelivery(1000);
309
310                channel.exchangeDelete(exchange);
311                channel.queueDelete(queueName);
312
313                channel.close();
314
315                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
316                        throw new IOException("Ping initialitzation has failed");
317                }
318        }
319
320        /**
321         * This method adds a ShutdownListener to the Broker's connection. When this
322         * connection falls, a new connection will be created and this will also
323         * have the listener.
324         */
325        private void addFaultTolerance() {
326                connection.addShutdownListener(new ShutdownListener() {
327                        @Override
328                        public void shutdownCompleted(ShutdownSignalException cause) {
329                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
330                                if (!connectionClosed)
331                                        if (cause.isHardError()) {
332                                                if (connection.isOpen()) {
333                                                        try {
334                                                                connection.close();
335                                                        } catch (IOException e) {
336                                                                e.printStackTrace();
337                                                        }
338                                                }
339                                                try {
340                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
341                                                        channel = connection.createChannel();
342                                                        addFaultTolerance();
343                                                } catch (Exception e) {
344                                                        e.printStackTrace();
345                                                }
346                                        } else {
347                                                Channel channel = (Channel) cause.getReference();
348                                                if (channel.isOpen()) {
349                                                        try {
350                                                                channel.close();
351                                                        } catch (IOException e) {
352                                                                e.printStackTrace();
353                                                        }
354                                                }
355                                        }
356                        }
357                });
358        }
359
360        public void setSupervisor(String brokerSet, String brokerName) throws Exception {
361                remoteBrokerImpl = new RemoteBrokerImpl();
362                remoteBrokerImpl.startRemoteBroker(brokerSet, brokerName, this, getEnvironment());
363        }
364
365        public Properties getEnvironment() {
366                return environment;
367        }
368
369        public ResponseListener getResponseListener() {
370                return responseListener;
371        }
372
373        public Serializer getSerializer() {
374                return serializer;
375        }
376
377        public Map<String, RemoteObject> getRemoteObjs() {
378                return remoteObjs;
379        }
380
381}
Note: See TracBrowser for help on using the repository browser.