source: trunk/src/main/java/omq/common/broker/Broker.java @ 103

Last change on this file since 103 was 98, checked in by stoda, 11 years ago

0.5.6
Synchronized channel and reopening when they are closed

File size: 10.5 KB
RevLine 
[44]1package omq.common.broker;
2
3import java.io.IOException;
[70]4import java.lang.reflect.Proxy;
[49]5import java.net.URL;
[44]6import java.util.HashMap;
[59]7import java.util.Hashtable;
[44]8import java.util.Map;
9import java.util.Properties;
10
11import omq.Remote;
12import omq.client.listener.ResponseListener;
[70]13import omq.client.proxy.MultiProxymq;
[44]14import omq.client.proxy.Proxymq;
15import omq.common.util.OmqConnectionFactory;
16import omq.common.util.ParameterQueue;
17import omq.common.util.Serializer;
[83]18import omq.exception.AlreadyBoundException;
[44]19import omq.exception.InitBrokerException;
20import omq.exception.RemoteException;
21import omq.server.RemoteObject;
22
[49]23import org.apache.log4j.Logger;
24import org.apache.log4j.xml.DOMConfigurator;
25
[44]26import com.rabbitmq.client.Channel;
27import com.rabbitmq.client.Connection;
28import com.rabbitmq.client.QueueingConsumer;
[98]29import com.rabbitmq.client.AMQP.BasicProperties;
[44]30import com.rabbitmq.client.QueueingConsumer.Delivery;
31import com.rabbitmq.client.ShutdownListener;
32import com.rabbitmq.client.ShutdownSignalException;
33
[83]34/**
35 * A "broker" allows a new connection to a RabbitMQ server. Under this
36 * connection it can have binded object and proxies.
37 *
38 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
39 *
40 */
[44]41public class Broker {
[49]42
43        private static final Logger logger = Logger.getLogger(Broker.class.getName());
44
[53]45        private Connection connection;
46        private Channel channel;
47        private ResponseListener responseListener;
48        private Serializer serializer;
49        private boolean clientStarted = false;
50        private boolean connectionClosed = false;
51        private Properties environment = null;
52        private Map<String, RemoteObject> remoteObjs;
[59]53        private Map<String, Object> proxies = new Hashtable<String, Object>();
[70]54        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
[44]55
[53]56        public Broker(Properties env) throws Exception {
57                // Load log4j configuration
58                URL log4jResource = Broker.class.getResource("/log4j.xml");
59                DOMConfigurator.configure(log4jResource);
[49]60
[53]61                remoteObjs = new HashMap<String, RemoteObject>();
62                serializer = new Serializer(env);
63                environment = env;
64                connection = OmqConnectionFactory.getNewConnection(env);
65                channel = connection.createChannel();
66                addFaultTolerance();
67                try {
68                        tryConnection(env);
69                } catch (Exception e) {
70                        channel.close();
71                        connection.close();
72                        throw new InitBrokerException("The connection didn't work");
[44]73                }
74        }
75
[83]76        /**
77         * This method stops the broker's connection and all the threads created
78         *
79         * @throws Exception
80         */
[53]81        public void stopBroker() throws Exception {
[49]82                logger.warn("Stopping broker");
[44]83                // Stop the client
84                if (clientStarted) {
[53]85                        responseListener.kill();
[70]86                        // TODO proxies = null; ??
[44]87                }
88                // Stop all the remote objects working
89                for (String reference : remoteObjs.keySet()) {
90                        unbind(reference);
91                }
[47]92
[44]93                // Close the connection once all the listeners are died
94                closeConnection();
[47]95
96                clientStarted = false;
97                connectionClosed = false;
98                environment = null;
99                remoteObjs = null;
[44]100        }
101
102        /**
103         * @return Broker's connection
104         * @throws Exception
105         */
[53]106        public Connection getConnection() throws Exception {
[44]107                return connection;
108        }
109
[83]110        /**
111         * This method close the broker's connection
112         *
113         * @throws IOException
114         */
[53]115        public void closeConnection() throws IOException {
[49]116                logger.warn("Clossing connection");
[44]117                connectionClosed = true;
118                connection.close();
[47]119                connectionClosed = false;
[44]120        }
121
122        /**
[83]123         * Return the broker's channel
[44]124         *
125         * @return Broker's channel
126         * @throws Exception
127         */
[98]128        public synchronized Channel getChannel() throws Exception {
[44]129                return channel;
130        }
131
132        /**
133         * Creates a new channel using the Broker's connection
134         *
135         * @return newChannel
136         * @throws IOException
137         */
[98]138        public synchronized Channel getNewChannel() throws IOException {
[44]139                return connection.createChannel();
140        }
141
[83]142        /**
[98]143         *
144         */
145        public synchronized void publishMessge(String exchange, String routingKey, BasicProperties props, byte[] bytesRequest) throws IOException {
146                if (!channel.isOpen()) {
147                        logger.error("Broker's channel is closed opening a new one", channel.getCloseReason());
148                        channel = getNewChannel();
149                }
150                channel.basicPublish(exchange, routingKey, props, bytesRequest);
151        }
152
153        /**
[83]154         * Returns the remote object for specified reference.
155         *
156         * @param reference
157         *            - Binding name
158         * @param contract
159         *            - Remote Interface
160         * @return newProxy
161         * @throws RemoteException
162         */
[44]163        @SuppressWarnings("unchecked")
[59]164        public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
[44]165                try {
166
167                        if (!clientStarted) {
[83]168                                initClient();
[44]169                        }
170
[59]171                        if (!proxies.containsKey(reference)) {
[53]172                                Proxymq proxy = new Proxymq(reference, contract, this);
[44]173                                Class<?>[] array = { contract };
[70]174                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
[59]175                                proxies.put(reference, newProxy);
176                                return (T) newProxy;
[44]177                        }
[59]178                        return (T) proxies.get(reference);
[44]179
180                } catch (Exception e) {
181                        throw new RemoteException(e);
182                }
183        }
184
[83]185        /**
186         * Returns the remote object for specified reference. This function returns
187         * an special type of proxy, every method invoked will be multi and
188         * asynchronous.
189         *
190         * @param reference
191         *            - Binding name
192         * @param contract
193         *            - Remote Interface
194         * @return newProxy
195         * @throws RemoteException
196         */
[70]197        @SuppressWarnings("unchecked")
198        public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException {
199                try {
200                        if (!multiProxies.containsKey(reference)) {
201                                MultiProxymq proxy = new MultiProxymq(reference, contract, this);
202                                Class<?>[] array = { contract };
203                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
204                                multiProxies.put(reference, newProxy);
205                                return (T) newProxy;
206                        }
207                        return (T) multiProxies.get(reference);
208
209                } catch (Exception e) {
210                        throw new RemoteException(e);
211                }
212        }
213
[83]214        /**
215         * Binds the reference to the specified remote object. This function uses
216         * the broker's environment
217         *
218         * @param reference
219         *            - Binding name
220         * @param remote
221         *            - RemoteObject to bind
222         * @throws RemoteException
223         *             If the remote operation failed
224         * @throws AlreadyBoundException
225         *             If name is already bound.
226         */
227        public void bind(String reference, RemoteObject remote) throws RemoteException, AlreadyBoundException {
[74]228                bind(reference, remote, environment);
[44]229        }
[70]230
[83]231        /**
232         * Binds the reference to the specified remote object. This function uses
233         * the broker's environment
234         *
235         * @param reference
236         *            - Binding name
237         * @param remote
238         *            - RemoteObject to bind
239         * @param env
240         *            - RemoteObject environment. You can set how many threads will
241         *            be listen to the reference, the multiqueue name and the
242         *            properties of the object queue and multiqueue
243         * @throws RemoteException
244         *             If the remote operation failed
245         * @throws AlreadyBoundException
246         *             If name is already bound.
247         */
248        public void bind(String reference, RemoteObject remote, Properties env) throws RemoteException, AlreadyBoundException {
249                if (remoteObjs.containsKey(reference)) {
250                        throw new AlreadyBoundException(reference);
251                }
252                // Try to start the remtoeObject listeners
[66]253                try {
[74]254                        remote.startRemoteObject(reference, this, env);
[66]255                        remoteObjs.put(reference, remote);
256                } catch (Exception e) {
257                        throw new RemoteException(e);
258                }
259        }
[44]260
[83]261        /**
262         * Unbinds a remoteObject from its reference and kills all the threads
263         * created.
264         *
265         * @param reference
266         *            - Binding name
267         * @throws RemoteException
268         *             If the remote operation failed
269         * @throws IOException
270         *             If there are problems while killing the threads
271         */
[53]272        public void unbind(String reference) throws RemoteException, IOException {
[44]273                if (remoteObjs.containsKey(reference)) {
274                        RemoteObject remote = remoteObjs.get(reference);
275                        remote.kill();
276                } else {
277                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
278                }
279
280        }
281
282        /**
[83]283         * This method ensures the client will have only one ResponseListener.
[44]284         *
285         * @throws Exception
286         */
[83]287        private synchronized void initClient() throws Exception {
[53]288                if (responseListener == null) {
289                        responseListener = new ResponseListener(this);
[54]290                        responseListener.start();
[83]291                        clientStarted = true;
[44]292                }
293        }
294
295        /**
296         * This function is used to send a ping message to see if the connection
297         * works
298         *
299         * @param env
300         * @throws Exception
301         */
[53]302        public void tryConnection(Properties env) throws Exception {
[44]303                Channel channel = connection.createChannel();
304                String message = "ping";
305
306                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
307                String queueName = exchange;
308                String routingKey = "routingKey";
309
310                channel.exchangeDeclare(exchange, "direct");
311                channel.queueDeclare(queueName, false, false, false, null);
312                channel.queueBind(queueName, exchange, routingKey);
313
314                channel.basicPublish(exchange, routingKey, null, message.getBytes());
315
316                QueueingConsumer consumer = new QueueingConsumer(channel);
317
318                channel.basicConsume(queueName, true, consumer);
319                Delivery delivery = consumer.nextDelivery(1000);
320
321                channel.exchangeDelete(exchange);
322                channel.queueDelete(queueName);
323
324                channel.close();
325
326                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
327                        throw new IOException("Ping initialitzation has failed");
328                }
329        }
330
331        /**
332         * This method adds a ShutdownListener to the Broker's connection. When this
333         * connection falls, a new connection will be created and this will also
334         * have the listener.
335         */
[53]336        private void addFaultTolerance() {
[44]337                connection.addShutdownListener(new ShutdownListener() {
338                        @Override
339                        public void shutdownCompleted(ShutdownSignalException cause) {
[49]340                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
[44]341                                if (!connectionClosed)
342                                        if (cause.isHardError()) {
343                                                if (connection.isOpen()) {
344                                                        try {
345                                                                connection.close();
346                                                        } catch (IOException e) {
347                                                                e.printStackTrace();
348                                                        }
349                                                }
350                                                try {
[47]351                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
[44]352                                                        channel = connection.createChannel();
353                                                        addFaultTolerance();
354                                                } catch (Exception e) {
355                                                        e.printStackTrace();
356                                                }
357                                        } else {
358                                                Channel channel = (Channel) cause.getReference();
359                                                if (channel.isOpen()) {
360                                                        try {
361                                                                channel.close();
362                                                        } catch (IOException e) {
363                                                                e.printStackTrace();
364                                                        }
365                                                }
366                                        }
367                        }
368                });
369        }
370
[53]371        public Properties getEnvironment() {
[47]372                return environment;
373        }
374
[53]375        public ResponseListener getResponseListener() {
376                return responseListener;
377        }
378
379        public Serializer getSerializer() {
380                return serializer;
381        }
[44]382}
Note: See TracBrowser for help on using the repository browser.