source: trunk/src/main/java/omq/common/broker/Broker.java @ 98

Last change on this file since 98 was 98, checked in by stoda, 11 years ago

0.5.6
Synchronized channel and reopening when they are closed

File size: 10.5 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.lang.reflect.Proxy;
5import java.net.URL;
6import java.util.HashMap;
7import java.util.Hashtable;
8import java.util.Map;
9import java.util.Properties;
10
11import omq.Remote;
12import omq.client.listener.ResponseListener;
13import omq.client.proxy.MultiProxymq;
14import omq.client.proxy.Proxymq;
15import omq.common.util.OmqConnectionFactory;
16import omq.common.util.ParameterQueue;
17import omq.common.util.Serializer;
18import omq.exception.AlreadyBoundException;
19import omq.exception.InitBrokerException;
20import omq.exception.RemoteException;
21import omq.server.RemoteObject;
22
23import org.apache.log4j.Logger;
24import org.apache.log4j.xml.DOMConfigurator;
25
26import com.rabbitmq.client.Channel;
27import com.rabbitmq.client.Connection;
28import com.rabbitmq.client.QueueingConsumer;
29import com.rabbitmq.client.AMQP.BasicProperties;
30import com.rabbitmq.client.QueueingConsumer.Delivery;
31import com.rabbitmq.client.ShutdownListener;
32import com.rabbitmq.client.ShutdownSignalException;
33
34/**
35 * A "broker" allows a new connection to a RabbitMQ server. Under this
36 * connection it can have binded object and proxies.
37 *
38 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
39 *
40 */
41public class Broker {
42
43        private static final Logger logger = Logger.getLogger(Broker.class.getName());
44
45        private Connection connection;
46        private Channel channel;
47        private ResponseListener responseListener;
48        private Serializer serializer;
49        private boolean clientStarted = false;
50        private boolean connectionClosed = false;
51        private Properties environment = null;
52        private Map<String, RemoteObject> remoteObjs;
53        private Map<String, Object> proxies = new Hashtable<String, Object>();
54        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
55
56        public Broker(Properties env) throws Exception {
57                // Load log4j configuration
58                URL log4jResource = Broker.class.getResource("/log4j.xml");
59                DOMConfigurator.configure(log4jResource);
60
61                remoteObjs = new HashMap<String, RemoteObject>();
62                serializer = new Serializer(env);
63                environment = env;
64                connection = OmqConnectionFactory.getNewConnection(env);
65                channel = connection.createChannel();
66                addFaultTolerance();
67                try {
68                        tryConnection(env);
69                } catch (Exception e) {
70                        channel.close();
71                        connection.close();
72                        throw new InitBrokerException("The connection didn't work");
73                }
74        }
75
76        /**
77         * This method stops the broker's connection and all the threads created
78         *
79         * @throws Exception
80         */
81        public void stopBroker() throws Exception {
82                logger.warn("Stopping broker");
83                // Stop the client
84                if (clientStarted) {
85                        responseListener.kill();
86                        // TODO proxies = null; ??
87                }
88                // Stop all the remote objects working
89                for (String reference : remoteObjs.keySet()) {
90                        unbind(reference);
91                }
92
93                // Close the connection once all the listeners are died
94                closeConnection();
95
96                clientStarted = false;
97                connectionClosed = false;
98                environment = null;
99                remoteObjs = null;
100        }
101
102        /**
103         * @return Broker's connection
104         * @throws Exception
105         */
106        public Connection getConnection() throws Exception {
107                return connection;
108        }
109
110        /**
111         * This method close the broker's connection
112         *
113         * @throws IOException
114         */
115        public void closeConnection() throws IOException {
116                logger.warn("Clossing connection");
117                connectionClosed = true;
118                connection.close();
119                connectionClosed = false;
120        }
121
122        /**
123         * Return the broker's channel
124         *
125         * @return Broker's channel
126         * @throws Exception
127         */
128        public synchronized Channel getChannel() throws Exception {
129                return channel;
130        }
131
132        /**
133         * Creates a new channel using the Broker's connection
134         *
135         * @return newChannel
136         * @throws IOException
137         */
138        public synchronized Channel getNewChannel() throws IOException {
139                return connection.createChannel();
140        }
141
142        /**
143         *
144         */
145        public synchronized void publishMessge(String exchange, String routingKey, BasicProperties props, byte[] bytesRequest) throws IOException {
146                if (!channel.isOpen()) {
147                        logger.error("Broker's channel is closed opening a new one", channel.getCloseReason());
148                        channel = getNewChannel();
149                }
150                channel.basicPublish(exchange, routingKey, props, bytesRequest);
151        }
152
153        /**
154         * Returns the remote object for specified reference.
155         *
156         * @param reference
157         *            - Binding name
158         * @param contract
159         *            - Remote Interface
160         * @return newProxy
161         * @throws RemoteException
162         */
163        @SuppressWarnings("unchecked")
164        public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
165                try {
166
167                        if (!clientStarted) {
168                                initClient();
169                        }
170
171                        if (!proxies.containsKey(reference)) {
172                                Proxymq proxy = new Proxymq(reference, contract, this);
173                                Class<?>[] array = { contract };
174                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
175                                proxies.put(reference, newProxy);
176                                return (T) newProxy;
177                        }
178                        return (T) proxies.get(reference);
179
180                } catch (Exception e) {
181                        throw new RemoteException(e);
182                }
183        }
184
185        /**
186         * Returns the remote object for specified reference. This function returns
187         * an special type of proxy, every method invoked will be multi and
188         * asynchronous.
189         *
190         * @param reference
191         *            - Binding name
192         * @param contract
193         *            - Remote Interface
194         * @return newProxy
195         * @throws RemoteException
196         */
197        @SuppressWarnings("unchecked")
198        public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException {
199                try {
200                        if (!multiProxies.containsKey(reference)) {
201                                MultiProxymq proxy = new MultiProxymq(reference, contract, this);
202                                Class<?>[] array = { contract };
203                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
204                                multiProxies.put(reference, newProxy);
205                                return (T) newProxy;
206                        }
207                        return (T) multiProxies.get(reference);
208
209                } catch (Exception e) {
210                        throw new RemoteException(e);
211                }
212        }
213
214        /**
215         * Binds the reference to the specified remote object. This function uses
216         * the broker's environment
217         *
218         * @param reference
219         *            - Binding name
220         * @param remote
221         *            - RemoteObject to bind
222         * @throws RemoteException
223         *             If the remote operation failed
224         * @throws AlreadyBoundException
225         *             If name is already bound.
226         */
227        public void bind(String reference, RemoteObject remote) throws RemoteException, AlreadyBoundException {
228                bind(reference, remote, environment);
229        }
230
231        /**
232         * Binds the reference to the specified remote object. This function uses
233         * the broker's environment
234         *
235         * @param reference
236         *            - Binding name
237         * @param remote
238         *            - RemoteObject to bind
239         * @param env
240         *            - RemoteObject environment. You can set how many threads will
241         *            be listen to the reference, the multiqueue name and the
242         *            properties of the object queue and multiqueue
243         * @throws RemoteException
244         *             If the remote operation failed
245         * @throws AlreadyBoundException
246         *             If name is already bound.
247         */
248        public void bind(String reference, RemoteObject remote, Properties env) throws RemoteException, AlreadyBoundException {
249                if (remoteObjs.containsKey(reference)) {
250                        throw new AlreadyBoundException(reference);
251                }
252                // Try to start the remtoeObject listeners
253                try {
254                        remote.startRemoteObject(reference, this, env);
255                        remoteObjs.put(reference, remote);
256                } catch (Exception e) {
257                        throw new RemoteException(e);
258                }
259        }
260
261        /**
262         * Unbinds a remoteObject from its reference and kills all the threads
263         * created.
264         *
265         * @param reference
266         *            - Binding name
267         * @throws RemoteException
268         *             If the remote operation failed
269         * @throws IOException
270         *             If there are problems while killing the threads
271         */
272        public void unbind(String reference) throws RemoteException, IOException {
273                if (remoteObjs.containsKey(reference)) {
274                        RemoteObject remote = remoteObjs.get(reference);
275                        remote.kill();
276                } else {
277                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
278                }
279
280        }
281
282        /**
283         * This method ensures the client will have only one ResponseListener.
284         *
285         * @throws Exception
286         */
287        private synchronized void initClient() throws Exception {
288                if (responseListener == null) {
289                        responseListener = new ResponseListener(this);
290                        responseListener.start();
291                        clientStarted = true;
292                }
293        }
294
295        /**
296         * This function is used to send a ping message to see if the connection
297         * works
298         *
299         * @param env
300         * @throws Exception
301         */
302        public void tryConnection(Properties env) throws Exception {
303                Channel channel = connection.createChannel();
304                String message = "ping";
305
306                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
307                String queueName = exchange;
308                String routingKey = "routingKey";
309
310                channel.exchangeDeclare(exchange, "direct");
311                channel.queueDeclare(queueName, false, false, false, null);
312                channel.queueBind(queueName, exchange, routingKey);
313
314                channel.basicPublish(exchange, routingKey, null, message.getBytes());
315
316                QueueingConsumer consumer = new QueueingConsumer(channel);
317
318                channel.basicConsume(queueName, true, consumer);
319                Delivery delivery = consumer.nextDelivery(1000);
320
321                channel.exchangeDelete(exchange);
322                channel.queueDelete(queueName);
323
324                channel.close();
325
326                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
327                        throw new IOException("Ping initialitzation has failed");
328                }
329        }
330
331        /**
332         * This method adds a ShutdownListener to the Broker's connection. When this
333         * connection falls, a new connection will be created and this will also
334         * have the listener.
335         */
336        private void addFaultTolerance() {
337                connection.addShutdownListener(new ShutdownListener() {
338                        @Override
339                        public void shutdownCompleted(ShutdownSignalException cause) {
340                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
341                                if (!connectionClosed)
342                                        if (cause.isHardError()) {
343                                                if (connection.isOpen()) {
344                                                        try {
345                                                                connection.close();
346                                                        } catch (IOException e) {
347                                                                e.printStackTrace();
348                                                        }
349                                                }
350                                                try {
351                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
352                                                        channel = connection.createChannel();
353                                                        addFaultTolerance();
354                                                } catch (Exception e) {
355                                                        e.printStackTrace();
356                                                }
357                                        } else {
358                                                Channel channel = (Channel) cause.getReference();
359                                                if (channel.isOpen()) {
360                                                        try {
361                                                                channel.close();
362                                                        } catch (IOException e) {
363                                                                e.printStackTrace();
364                                                        }
365                                                }
366                                        }
367                        }
368                });
369        }
370
371        public Properties getEnvironment() {
372                return environment;
373        }
374
375        public ResponseListener getResponseListener() {
376                return responseListener;
377        }
378
379        public Serializer getSerializer() {
380                return serializer;
381        }
382}
Note: See TracBrowser for help on using the repository browser.