source: branches/supervisor/src/main/java/omq/common/broker/Broker.java @ 111

Last change on this file since 111 was 111, checked in by stoda, 11 years ago

Broker:

tryconnection won't be used
DOMConfigurator won't be used


File size: 12.1 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.lang.reflect.Proxy;
5import java.util.HashMap;
6import java.util.Hashtable;
7import java.util.Map;
8import java.util.Properties;
9
10import omq.Remote;
11import omq.client.listener.ResponseListener;
12import omq.client.proxy.MultiProxymq;
13import omq.client.proxy.Proxymq;
14import omq.common.util.OmqConnectionFactory;
15import omq.common.util.ParameterQueue;
16import omq.common.util.Serializer;
17import omq.exception.AlreadyBoundException;
18import omq.exception.InitBrokerException;
19import omq.exception.RemoteException;
20import omq.server.RemoteObject;
21import omq.supervisor.Supervisor;
22
23import org.apache.log4j.Logger;
24
25import com.rabbitmq.client.AMQP.BasicProperties;
26import com.rabbitmq.client.Channel;
27import com.rabbitmq.client.Connection;
28import com.rabbitmq.client.QueueingConsumer;
29import com.rabbitmq.client.QueueingConsumer.Delivery;
30import com.rabbitmq.client.ShutdownListener;
31import com.rabbitmq.client.ShutdownSignalException;
32
33/**
34 * A "broker" allows a new connection to a RabbitMQ server. Under this
35 * connection it can have binded object and proxies.
36 *
37 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
38 *
39 */
40public class Broker {
41
42        private static final Logger logger = Logger.getLogger(Broker.class.getName());
43
44        private Connection connection;
45        private Channel channel;
46        private ResponseListener responseListener;
47        private Serializer serializer;
48        private boolean clientStarted = false;
49        private boolean connectionClosed = false;
50        private Properties environment = null;
51        private Map<String, RemoteObject> remoteObjs;
52        private Map<String, Object> proxies = new Hashtable<String, Object>();
53        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
54
55        // Supervisor
56        private Supervisor supervisor;
57
58        public Broker(Properties env) throws Exception {
59                // Load log4j configuration
60                // URL log4jResource = Broker.class.getResource("/log4j.xml");
61                // DOMConfigurator.configure(log4jResource);
62
63                remoteObjs = new HashMap<String, RemoteObject>();
64                serializer = new Serializer(env);
65                environment = env;
66                connection = OmqConnectionFactory.getNewConnection(env);
67                channel = connection.createChannel();
68                addFaultTolerance();
69                if (!connection.isOpen() || !channel.isOpen()) {
70                        if (connection.isOpen()) {
71                                connection.close();
72                        }
73                        throw new InitBrokerException("The connection didn't work");
74                }
75
76                // try {
77                // tryConnection(env);
78                // } catch (Exception e) {
79                // channel.close();
80                // connection.close();
81                // throw new InitBrokerException("The connection didn't work");
82                // }
83        }
84
85        /**
86         * This method stops the broker's connection and all the threads created
87         *
88         * @throws Exception
89         */
90        public void stopBroker() throws Exception {
91                logger.warn("Stopping broker");
92                // Stop the client
93                if (clientStarted) {
94                        responseListener.kill();
95                        // TODO proxies = null; ??
96                }
97                // Stop all the remote objects working
98                for (String reference : remoteObjs.keySet()) {
99                        unbind(reference);
100                }
101
102                // Close the connection once all the listeners are died
103                closeConnection();
104
105                clientStarted = false;
106                // connectionClosed = false;
107                environment = null;
108                remoteObjs = null;
109        }
110
111        /**
112         * @return Broker's connection
113         * @throws Exception
114         */
115        public Connection getConnection() throws Exception {
116                return connection;
117        }
118
119        /**
120         * This method close the broker's connection
121         *
122         * @throws IOException
123         */
124        public void closeConnection() throws IOException {
125                logger.warn("Clossing connection");
126                connectionClosed = true;
127                connection.close();
128                // connectionClosed = false;
129        }
130
131        /**
132         * Return the broker's channel
133         *
134         * @return Broker's channel
135         * @throws Exception
136         */
137        public synchronized Channel getChannel() throws Exception {
138                return channel;
139        }
140
141        /**
142         * Creates a new channel using the Broker's connection
143         *
144         * @return newChannel
145         * @throws IOException
146         */
147        public synchronized Channel getNewChannel() throws IOException {
148                return connection.createChannel();
149        }
150
151        /**
152         * Publishes a request using a thread safe channel -if the channel is closed
153         * it tries to open it once
154         *
155         * @param exchange
156         * @param routingKey
157         * @param props
158         * @param bytesRequest
159         * @throws IOException
160         */
161        public synchronized void publishMessge(String exchange, String routingKey, BasicProperties props, byte[] bytesRequest)
162                        throws IOException {
163                if (!channel.isOpen()) {
164                        logger.error("Broker's channel is closed opening a new one", channel.getCloseReason());
165                        channel = getNewChannel();
166                }
167                channel.basicPublish(exchange, routingKey, props, bytesRequest);
168        }
169
170        /**
171         * Returns the remote object for specified reference.
172         *
173         * @param reference
174         *            - Binding name
175         * @param contract
176         *            - Remote Interface
177         * @return newProxy
178         * @throws RemoteException
179         */
180        @SuppressWarnings("unchecked")
181        public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
182                try {
183
184                        if (!clientStarted) {
185                                initClient();
186                        }
187
188                        if (!proxies.containsKey(reference)) {
189                                Proxymq proxy = new Proxymq(reference, contract, this);
190                                Class<?>[] array = { contract };
191                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
192                                proxies.put(reference, newProxy);
193                                return (T) newProxy;
194                        }
195                        return (T) proxies.get(reference);
196
197                } catch (Exception e) {
198                        throw new RemoteException(e);
199                }
200        }
201
202        /**
203         * Returns the remote object for specified reference. This function returns
204         * an special type of proxy, every method invoked will be multi and
205         * asynchronous.
206         *
207         * @param reference
208         *            - Binding name
209         * @param contract
210         *            - Remote Interface
211         * @return newProxy
212         * @throws RemoteException
213         */
214        @SuppressWarnings("unchecked")
215        public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException {
216                try {
217                        if (!multiProxies.containsKey(reference)) {
218                                MultiProxymq proxy = new MultiProxymq(reference, contract, this);
219                                Class<?>[] array = { contract };
220                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
221                                multiProxies.put(reference, newProxy);
222                                return (T) newProxy;
223                        }
224                        return (T) multiProxies.get(reference);
225
226                } catch (Exception e) {
227                        throw new RemoteException(e);
228                }
229        }
230
231        /**
232         * Binds the reference to the specified remote object. This function uses
233         * the broker's environment
234         *
235         * @param reference
236         *            - Binding name
237         * @param remote
238         *            - RemoteObject to bind
239         * @throws RemoteException
240         *             If the remote operation failed
241         * @throws AlreadyBoundException
242         *             If name is already bound.
243         */
244        public void bind(String reference, RemoteObject remote) throws RemoteException, AlreadyBoundException {
245                bind(reference, remote, environment);
246        }
247
248        public void bind(String reference, String UID, RemoteObject remote) throws RemoteException, AlreadyBoundException {
249                bind(reference, UID, remote, environment);
250        }
251
252        /**
253         * Binds the reference to the specified remote object. This function uses
254         * the broker's environment
255         *
256         * @param reference
257         *            - Binding name
258         * @param remote
259         *            - RemoteObject to bind
260         * @param env
261         *            - RemoteObject environment. You can set how many threads will
262         *            be listen to the reference, the multiqueue name and the
263         *            properties of the object queue and multiqueue
264         * @throws RemoteException
265         *             If the remote operation failed
266         * @throws AlreadyBoundException
267         *             If name is already bound.
268         */
269        public void bind(String reference, RemoteObject remote, Properties env) throws RemoteException, AlreadyBoundException {
270                if (remoteObjs.containsKey(reference)) {
271                        throw new AlreadyBoundException(reference);
272                }
273                // Try to start the remtoeObject listeners
274                try {
275                        remote.startRemoteObject(reference, this, env);
276                        remoteObjs.put(reference, remote);
277                } catch (Exception e) {
278                        throw new RemoteException(e);
279                }
280        }
281
282        public void bind(String reference, String UID, RemoteObject remote, Properties env) throws RemoteException, AlreadyBoundException {
283                if (remoteObjs.containsKey(reference)) {
284                        throw new AlreadyBoundException(reference);
285                }
286                // Try to start the remtoeObject listeners
287                try {
288                        remote.startRemoteObject(reference, UID, this, env);
289                        remoteObjs.put(reference, remote);
290                } catch (Exception e) {
291                        throw new RemoteException(e);
292                }
293        }
294
295        /**
296         * Unbinds a remoteObject from its reference and kills all the threads
297         * created.
298         *
299         * @param reference
300         *            - Binding name
301         * @throws RemoteException
302         *             If the remote operation failed
303         * @throws IOException
304         *             If there are problems while killing the threads
305         */
306        public void unbind(String reference) throws RemoteException, IOException {
307                if (remoteObjs.containsKey(reference)) {
308                        RemoteObject remote = remoteObjs.get(reference);
309                        remote.kill();
310                        remoteObjs.remove(reference);
311                } else {
312                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
313                }
314
315        }
316
317        /**
318         * This method ensures the client will have only one ResponseListener.
319         *
320         * @throws Exception
321         */
322        private synchronized void initClient() throws Exception {
323                if (responseListener == null) {
324                        responseListener = new ResponseListener(this);
325                        responseListener.start();
326                        clientStarted = true;
327                }
328        }
329
330        /**
331         * This function is used to send a ping message to see if the connection
332         * works
333         *
334         * @param env
335         * @throws Exception
336         */
337        public void tryConnection(Properties env) throws Exception {
338                Channel channel = connection.createChannel();
339                String message = "ping";
340
341                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
342                String queueName = exchange;
343                String routingKey = "routingKey";
344
345                channel.exchangeDeclare(exchange, "direct");
346                channel.queueDeclare(queueName, false, false, false, null);
347                channel.queueBind(queueName, exchange, routingKey);
348
349                channel.basicPublish(exchange, routingKey, null, message.getBytes());
350
351                QueueingConsumer consumer = new QueueingConsumer(channel);
352
353                channel.basicConsume(queueName, true, consumer);
354                Delivery delivery = consumer.nextDelivery(1000);
355
356                channel.exchangeDelete(exchange);
357                channel.queueDelete(queueName);
358
359                channel.close();
360
361                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
362                        throw new IOException("Ping initialitzation has failed");
363                }
364        }
365
366        /**
367         * This method adds a ShutdownListener to the Broker's connection. When this
368         * connection falls, a new connection will be created and this will also
369         * have the listener.
370         */
371        private void addFaultTolerance() {
372                connection.addShutdownListener(new ShutdownListener() {
373                        @Override
374                        public void shutdownCompleted(ShutdownSignalException cause) {
375                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
376                                if (!connectionClosed)
377                                        if (cause.isHardError()) {
378                                                if (connection.isOpen()) {
379                                                        try {
380                                                                connection.close();
381                                                        } catch (IOException e) {
382                                                                e.printStackTrace();
383                                                        }
384                                                }
385                                                try {
386                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
387                                                        channel = connection.createChannel();
388                                                        addFaultTolerance();
389                                                } catch (Exception e) {
390                                                        e.printStackTrace();
391                                                }
392                                        } else {
393                                                Channel channel = (Channel) cause.getReference();
394                                                if (channel.isOpen()) {
395                                                        try {
396                                                                channel.close();
397                                                        } catch (IOException e) {
398                                                                e.printStackTrace();
399                                                        }
400                                                }
401                                        }
402                        }
403                });
404        }
405
406        public Properties getEnvironment() {
407                return environment;
408        }
409
410        public ResponseListener getResponseListener() {
411                return responseListener;
412        }
413
414        public Serializer getSerializer() {
415                return serializer;
416        }
417
418        public Map<String, RemoteObject> getRemoteObjs() {
419                return remoteObjs;
420        }
421
422        /*
423         * Supervisor
424         */
425        public void setSupervisor(String supervisorName, String brokerSet, String brokerName) throws Exception {
426                // Create a RemoteBrokerImpl
427                bind(brokerSet, brokerName, new RemoteBrokerImpl());
428                // Subscribe broker
429                supervisor = lookup(supervisorName, Supervisor.class);
430                supervisor.subscribe(brokerSet, brokerName);
431                logger.info("Supervisor set: " + supervisorName + ", BrokerSet: " + brokerSet + ", BrokerName: " + brokerName);
432        }
433
434        public Supervisor getSupervisor() {
435                return supervisor;
436        }
437}
Note: See TracBrowser for help on using the repository browser.