source: branches/supervisor/src/main/java/omq/common/broker/Broker.java @ 100

Last change on this file since 100 was 99, checked in by stoda, 11 years ago
File size: 11.3 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.lang.reflect.Proxy;
5import java.net.URL;
6import java.util.HashMap;
7import java.util.Hashtable;
8import java.util.Map;
9import java.util.Properties;
10
11import omq.Remote;
12import omq.client.listener.ResponseListener;
13import omq.client.proxy.MultiProxymq;
14import omq.client.proxy.Proxymq;
15import omq.common.util.OmqConnectionFactory;
16import omq.common.util.ParameterQueue;
17import omq.common.util.Serializer;
18import omq.exception.AlreadyBoundException;
19import omq.exception.InitBrokerException;
20import omq.exception.RemoteException;
21import omq.server.RemoteObject;
22import omq.supervisor.Supervisor;
23
24import org.apache.log4j.Logger;
25import org.apache.log4j.xml.DOMConfigurator;
26
27import com.rabbitmq.client.Channel;
28import com.rabbitmq.client.Connection;
29import com.rabbitmq.client.QueueingConsumer;
30import com.rabbitmq.client.AMQP.BasicProperties;
31import com.rabbitmq.client.QueueingConsumer.Delivery;
32import com.rabbitmq.client.ShutdownListener;
33import com.rabbitmq.client.ShutdownSignalException;
34
35/**
36 * A "broker" allows a new connection to a RabbitMQ server. Under this
37 * connection it can have binded object and proxies.
38 *
39 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
40 *
41 */
42public class Broker {
43
44        private static final Logger logger = Logger.getLogger(Broker.class.getName());
45
46        private Connection connection;
47        private Channel channel;
48        private ResponseListener responseListener;
49        private Serializer serializer;
50        private boolean clientStarted = false;
51        private boolean connectionClosed = false;
52        private Properties environment = null;
53        private Map<String, RemoteObject> remoteObjs;
54        private Map<String, Object> proxies = new Hashtable<String, Object>();
55        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
56
57        // Supervisor
58        private Supervisor supervisor;
59
60        public Broker(Properties env) throws Exception {
61                // Load log4j configuration
62                URL log4jResource = Broker.class.getResource("/log4j.xml");
63                DOMConfigurator.configure(log4jResource);
64
65                remoteObjs = new HashMap<String, RemoteObject>();
66                serializer = new Serializer(env);
67                environment = env;
68                connection = OmqConnectionFactory.getNewConnection(env);
69                channel = connection.createChannel();
70                addFaultTolerance();
71                try {
72                        tryConnection(env);
73                } catch (Exception e) {
74                        channel.close();
75                        connection.close();
76                        throw new InitBrokerException("The connection didn't work");
77                }
78        }
79
80        /**
81         * This method stops the broker's connection and all the threads created
82         *
83         * @throws Exception
84         */
85        public void stopBroker() throws Exception {
86                logger.warn("Stopping broker");
87                // Stop the client
88                if (clientStarted) {
89                        responseListener.kill();
90                        // TODO proxies = null; ??
91                }
92                // Stop all the remote objects working
93                for (String reference : remoteObjs.keySet()) {
94                        unbind(reference);
95                }
96
97                // Close the connection once all the listeners are died
98                closeConnection();
99
100                clientStarted = false;
101                connectionClosed = false;
102                environment = null;
103                remoteObjs = null;
104        }
105
106        /**
107         * @return Broker's connection
108         * @throws Exception
109         */
110        public Connection getConnection() throws Exception {
111                return connection;
112        }
113
114        /**
115         * This method close the broker's connection
116         *
117         * @throws IOException
118         */
119        public void closeConnection() throws IOException {
120                logger.warn("Clossing connection");
121                connectionClosed = true;
122                connection.close();
123                connectionClosed = false;
124        }
125
126        /**
127         * Return the broker's channel
128         *
129         * @return Broker's channel
130         * @throws Exception
131         */
132        public synchronized Channel getChannel() throws Exception {
133                return channel;
134        }
135
136        /**
137         * Creates a new channel using the Broker's connection
138         *
139         * @return newChannel
140         * @throws IOException
141         */
142        public synchronized Channel getNewChannel() throws IOException {
143                return connection.createChannel();
144        }
145
146        /**
147         * Publishes a request using a thread safe channel -if the channel is closed
148         * it tries to open it once
149         *
150         * @param exchange
151         * @param routingKey
152         * @param props
153         * @param bytesRequest
154         * @throws IOException
155         */
156        public synchronized void publishMessge(String exchange, String routingKey, BasicProperties props, byte[] bytesRequest) throws IOException {
157                if (!channel.isOpen()) {
158                        logger.error("Broker's channel is closed opening a new one", channel.getCloseReason());
159                        channel = getNewChannel();
160                }
161                channel.basicPublish(exchange, routingKey, props, bytesRequest);
162        }
163
164        /**
165         * Returns the remote object for specified reference.
166         *
167         * @param reference
168         *            - Binding name
169         * @param contract
170         *            - Remote Interface
171         * @return newProxy
172         * @throws RemoteException
173         */
174        @SuppressWarnings("unchecked")
175        public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
176                try {
177
178                        if (!clientStarted) {
179                                initClient();
180                        }
181
182                        if (!proxies.containsKey(reference)) {
183                                Proxymq proxy = new Proxymq(reference, contract, this);
184                                Class<?>[] array = { contract };
185                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
186                                proxies.put(reference, newProxy);
187                                return (T) newProxy;
188                        }
189                        return (T) proxies.get(reference);
190
191                } catch (Exception e) {
192                        throw new RemoteException(e);
193                }
194        }
195
196        /**
197         * Returns the remote object for specified reference. This function returns
198         * an special type of proxy, every method invoked will be multi and
199         * asynchronous.
200         *
201         * @param reference
202         *            - Binding name
203         * @param contract
204         *            - Remote Interface
205         * @return newProxy
206         * @throws RemoteException
207         */
208        @SuppressWarnings("unchecked")
209        public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException {
210                try {
211                        if (!multiProxies.containsKey(reference)) {
212                                MultiProxymq proxy = new MultiProxymq(reference, contract, this);
213                                Class<?>[] array = { contract };
214                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
215                                multiProxies.put(reference, newProxy);
216                                return (T) newProxy;
217                        }
218                        return (T) multiProxies.get(reference);
219
220                } catch (Exception e) {
221                        throw new RemoteException(e);
222                }
223        }
224
225        /**
226         * Binds the reference to the specified remote object. This function uses
227         * the broker's environment
228         *
229         * @param reference
230         *            - Binding name
231         * @param remote
232         *            - RemoteObject to bind
233         * @throws RemoteException
234         *             If the remote operation failed
235         * @throws AlreadyBoundException
236         *             If name is already bound.
237         */
238        public void bind(String reference, RemoteObject remote) throws RemoteException, AlreadyBoundException {
239                bind(reference, remote, environment);
240        }
241
242        /**
243         * Binds the reference to the specified remote object. This function uses
244         * the broker's environment
245         *
246         * @param reference
247         *            - Binding name
248         * @param remote
249         *            - RemoteObject to bind
250         * @param env
251         *            - RemoteObject environment. You can set how many threads will
252         *            be listen to the reference, the multiqueue name and the
253         *            properties of the object queue and multiqueue
254         * @throws RemoteException
255         *             If the remote operation failed
256         * @throws AlreadyBoundException
257         *             If name is already bound.
258         */
259        public void bind(String reference, RemoteObject remote, Properties env) throws RemoteException, AlreadyBoundException {
260                if (remoteObjs.containsKey(reference)) {
261                        throw new AlreadyBoundException(reference);
262                }
263                // Try to start the remtoeObject listeners
264                try {
265                        remote.startRemoteObject(reference, this, env);
266                        remoteObjs.put(reference, remote);
267                } catch (Exception e) {
268                        throw new RemoteException(e);
269                }
270        }
271
272        /**
273         * Unbinds a remoteObject from its reference and kills all the threads
274         * created.
275         *
276         * @param reference
277         *            - Binding name
278         * @throws RemoteException
279         *             If the remote operation failed
280         * @throws IOException
281         *             If there are problems while killing the threads
282         */
283        public void unbind(String reference) throws RemoteException, IOException {
284                if (remoteObjs.containsKey(reference)) {
285                        RemoteObject remote = remoteObjs.get(reference);
286                        remote.kill();
287                        remoteObjs.remove(reference);
288                } else {
289                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
290                }
291
292        }
293
294        /**
295         * This method ensures the client will have only one ResponseListener.
296         *
297         * @throws Exception
298         */
299        private synchronized void initClient() throws Exception {
300                if (responseListener == null) {
301                        responseListener = new ResponseListener(this);
302                        responseListener.start();
303                        clientStarted = true;
304                }
305        }
306
307        /**
308         * This function is used to send a ping message to see if the connection
309         * works
310         *
311         * @param env
312         * @throws Exception
313         */
314        public void tryConnection(Properties env) throws Exception {
315                Channel channel = connection.createChannel();
316                String message = "ping";
317
318                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
319                String queueName = exchange;
320                String routingKey = "routingKey";
321
322                channel.exchangeDeclare(exchange, "direct");
323                channel.queueDeclare(queueName, false, false, false, null);
324                channel.queueBind(queueName, exchange, routingKey);
325
326                channel.basicPublish(exchange, routingKey, null, message.getBytes());
327
328                QueueingConsumer consumer = new QueueingConsumer(channel);
329
330                channel.basicConsume(queueName, true, consumer);
331                Delivery delivery = consumer.nextDelivery(1000);
332
333                channel.exchangeDelete(exchange);
334                channel.queueDelete(queueName);
335
336                channel.close();
337
338                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
339                        throw new IOException("Ping initialitzation has failed");
340                }
341        }
342
343        /**
344         * This method adds a ShutdownListener to the Broker's connection. When this
345         * connection falls, a new connection will be created and this will also
346         * have the listener.
347         */
348        private void addFaultTolerance() {
349                connection.addShutdownListener(new ShutdownListener() {
350                        @Override
351                        public void shutdownCompleted(ShutdownSignalException cause) {
352                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
353                                if (!connectionClosed)
354                                        if (cause.isHardError()) {
355                                                if (connection.isOpen()) {
356                                                        try {
357                                                                connection.close();
358                                                        } catch (IOException e) {
359                                                                e.printStackTrace();
360                                                        }
361                                                }
362                                                try {
363                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
364                                                        channel = connection.createChannel();
365                                                        addFaultTolerance();
366                                                } catch (Exception e) {
367                                                        e.printStackTrace();
368                                                }
369                                        } else {
370                                                Channel channel = (Channel) cause.getReference();
371                                                if (channel.isOpen()) {
372                                                        try {
373                                                                channel.close();
374                                                        } catch (IOException e) {
375                                                                e.printStackTrace();
376                                                        }
377                                                }
378                                        }
379                        }
380                });
381        }
382
383        public Properties getEnvironment() {
384                return environment;
385        }
386
387        public ResponseListener getResponseListener() {
388                return responseListener;
389        }
390
391        public Serializer getSerializer() {
392                return serializer;
393        }
394
395        public Map<String, RemoteObject> getRemoteObjs() {
396                return remoteObjs;
397        }
398
399        /*
400         * Supervisor
401         */
402        public void setSupervisor(String supervisorName, String brokerName) throws Exception {
403                // Create a RemoteBrokerImpl
404                bind(brokerName, new RemoteBrokerImpl());
405                // Subscribe broker
406                supervisor = lookup(supervisorName, Supervisor.class);
407                supervisor.subscribe(brokerName);
408                logger.info("Supervisor set: " + supervisorName + ", BrokerName: " + brokerName);
409        }
410
411        public Supervisor getSupervisor() {
412                return supervisor;
413        }
414}
Note: See TracBrowser for help on using the repository browser.