source: branches/supervisor/src/main/java/omq/common/broker/Broker.java @ 106

Last change on this file since 106 was 106, checked in by stoda, 11 years ago

abans que la segueixi liant...

File size: 12.0 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.lang.reflect.Proxy;
5import java.net.URL;
6import java.util.HashMap;
7import java.util.Hashtable;
8import java.util.Map;
9import java.util.Properties;
10
11import omq.Remote;
12import omq.client.listener.ResponseListener;
13import omq.client.proxy.MultiProxymq;
14import omq.client.proxy.Proxymq;
15import omq.common.util.OmqConnectionFactory;
16import omq.common.util.ParameterQueue;
17import omq.common.util.Serializer;
18import omq.exception.AlreadyBoundException;
19import omq.exception.InitBrokerException;
20import omq.exception.RemoteException;
21import omq.server.RemoteObject;
22import omq.supervisor.Supervisor;
23
24import org.apache.log4j.Logger;
25import org.apache.log4j.xml.DOMConfigurator;
26
27import com.rabbitmq.client.Channel;
28import com.rabbitmq.client.Connection;
29import com.rabbitmq.client.QueueingConsumer;
30import com.rabbitmq.client.AMQP.BasicProperties;
31import com.rabbitmq.client.QueueingConsumer.Delivery;
32import com.rabbitmq.client.ShutdownListener;
33import com.rabbitmq.client.ShutdownSignalException;
34
35/**
36 * A "broker" allows a new connection to a RabbitMQ server. Under this
37 * connection it can have binded object and proxies.
38 *
39 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
40 *
41 */
42public class Broker {
43
44        private static final Logger logger = Logger.getLogger(Broker.class.getName());
45
46        private Connection connection;
47        private Channel channel;
48        private ResponseListener responseListener;
49        private Serializer serializer;
50        private boolean clientStarted = false;
51        private boolean connectionClosed = false;
52        private Properties environment = null;
53        private Map<String, RemoteObject> remoteObjs;
54        private Map<String, Object> proxies = new Hashtable<String, Object>();
55        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
56
57        // Supervisor
58        private Supervisor supervisor;
59
60        public Broker(Properties env) throws Exception {
61                // Load log4j configuration
62                URL log4jResource = Broker.class.getResource("/log4j.xml");
63                DOMConfigurator.configure(log4jResource);
64
65                remoteObjs = new HashMap<String, RemoteObject>();
66                serializer = new Serializer(env);
67                environment = env;
68                connection = OmqConnectionFactory.getNewConnection(env);
69                channel = connection.createChannel();
70                addFaultTolerance();
71                try {
72                        tryConnection(env);
73                } catch (Exception e) {
74                        channel.close();
75                        connection.close();
76                        throw new InitBrokerException("The connection didn't work");
77                }
78        }
79
80        /**
81         * This method stops the broker's connection and all the threads created
82         *
83         * @throws Exception
84         */
85        public void stopBroker() throws Exception {
86                logger.warn("Stopping broker");
87                // Stop the client
88                if (clientStarted) {
89                        responseListener.kill();
90                        // TODO proxies = null; ??
91                }
92                // Stop all the remote objects working
93                for (String reference : remoteObjs.keySet()) {
94                        unbind(reference);
95                }
96
97                // Close the connection once all the listeners are died
98                closeConnection();
99
100                clientStarted = false;
101                connectionClosed = false;
102                environment = null;
103                remoteObjs = null;
104        }
105
106        /**
107         * @return Broker's connection
108         * @throws Exception
109         */
110        public Connection getConnection() throws Exception {
111                return connection;
112        }
113
114        /**
115         * This method close the broker's connection
116         *
117         * @throws IOException
118         */
119        public void closeConnection() throws IOException {
120                logger.warn("Clossing connection");
121                connectionClosed = true;
122                connection.close();
123                connectionClosed = false;
124        }
125
126        /**
127         * Return the broker's channel
128         *
129         * @return Broker's channel
130         * @throws Exception
131         */
132        public synchronized Channel getChannel() throws Exception {
133                return channel;
134        }
135
136        /**
137         * Creates a new channel using the Broker's connection
138         *
139         * @return newChannel
140         * @throws IOException
141         */
142        public synchronized Channel getNewChannel() throws IOException {
143                return connection.createChannel();
144        }
145
146        /**
147         * Publishes a request using a thread safe channel -if the channel is closed
148         * it tries to open it once
149         *
150         * @param exchange
151         * @param routingKey
152         * @param props
153         * @param bytesRequest
154         * @throws IOException
155         */
156        public synchronized void publishMessge(String exchange, String routingKey, BasicProperties props, byte[] bytesRequest) throws IOException {
157                if (!channel.isOpen()) {
158                        logger.error("Broker's channel is closed opening a new one", channel.getCloseReason());
159                        channel = getNewChannel();
160                }
161                channel.basicPublish(exchange, routingKey, props, bytesRequest);
162        }
163
164        /**
165         * Returns the remote object for specified reference.
166         *
167         * @param reference
168         *            - Binding name
169         * @param contract
170         *            - Remote Interface
171         * @return newProxy
172         * @throws RemoteException
173         */
174        @SuppressWarnings("unchecked")
175        public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
176                try {
177
178                        if (!clientStarted) {
179                                initClient();
180                        }
181
182                        if (!proxies.containsKey(reference)) {
183                                Proxymq proxy = new Proxymq(reference, contract, this);
184                                Class<?>[] array = { contract };
185                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
186                                proxies.put(reference, newProxy);
187                                return (T) newProxy;
188                        }
189                        return (T) proxies.get(reference);
190
191                } catch (Exception e) {
192                        throw new RemoteException(e);
193                }
194        }
195
196        /**
197         * Returns the remote object for specified reference. This function returns
198         * an special type of proxy, every method invoked will be multi and
199         * asynchronous.
200         *
201         * @param reference
202         *            - Binding name
203         * @param contract
204         *            - Remote Interface
205         * @return newProxy
206         * @throws RemoteException
207         */
208        @SuppressWarnings("unchecked")
209        public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException {
210                try {
211                        if (!multiProxies.containsKey(reference)) {
212                                MultiProxymq proxy = new MultiProxymq(reference, contract, this);
213                                Class<?>[] array = { contract };
214                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
215                                multiProxies.put(reference, newProxy);
216                                return (T) newProxy;
217                        }
218                        return (T) multiProxies.get(reference);
219
220                } catch (Exception e) {
221                        throw new RemoteException(e);
222                }
223        }
224
225        /**
226         * Binds the reference to the specified remote object. This function uses
227         * the broker's environment
228         *
229         * @param reference
230         *            - Binding name
231         * @param remote
232         *            - RemoteObject to bind
233         * @throws RemoteException
234         *             If the remote operation failed
235         * @throws AlreadyBoundException
236         *             If name is already bound.
237         */
238        public void bind(String reference, RemoteObject remote) throws RemoteException, AlreadyBoundException {
239                bind(reference, remote, environment);
240        }
241
242        public void bind(String reference, String UID, RemoteObject remote) throws RemoteException, AlreadyBoundException {
243                bind(reference, UID, remote, environment);
244        }
245
246        /**
247         * Binds the reference to the specified remote object. This function uses
248         * the broker's environment
249         *
250         * @param reference
251         *            - Binding name
252         * @param remote
253         *            - RemoteObject to bind
254         * @param env
255         *            - RemoteObject environment. You can set how many threads will
256         *            be listen to the reference, the multiqueue name and the
257         *            properties of the object queue and multiqueue
258         * @throws RemoteException
259         *             If the remote operation failed
260         * @throws AlreadyBoundException
261         *             If name is already bound.
262         */
263        public void bind(String reference, RemoteObject remote, Properties env) throws RemoteException, AlreadyBoundException {
264                if (remoteObjs.containsKey(reference)) {
265                        throw new AlreadyBoundException(reference);
266                }
267                // Try to start the remtoeObject listeners
268                try {
269                        remote.startRemoteObject(reference, this, env);
270                        remoteObjs.put(reference, remote);
271                } catch (Exception e) {
272                        throw new RemoteException(e);
273                }
274        }
275
276        public void bind(String reference, String UID, RemoteObject remote, Properties env) throws RemoteException, AlreadyBoundException {
277                if (remoteObjs.containsKey(reference)) {
278                        throw new AlreadyBoundException(reference);
279                }
280                // Try to start the remtoeObject listeners
281                try {
282                        remote.startRemoteObject(reference, UID, this, env);
283                        remoteObjs.put(reference, remote);
284                } catch (Exception e) {
285                        throw new RemoteException(e);
286                }
287        }
288
289        /**
290         * Unbinds a remoteObject from its reference and kills all the threads
291         * created.
292         *
293         * @param reference
294         *            - Binding name
295         * @throws RemoteException
296         *             If the remote operation failed
297         * @throws IOException
298         *             If there are problems while killing the threads
299         */
300        public void unbind(String reference) throws RemoteException, IOException {
301                if (remoteObjs.containsKey(reference)) {
302                        RemoteObject remote = remoteObjs.get(reference);
303                        remote.kill();
304                        remoteObjs.remove(reference);
305                } else {
306                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
307                }
308
309        }
310
311        /**
312         * This method ensures the client will have only one ResponseListener.
313         *
314         * @throws Exception
315         */
316        private synchronized void initClient() throws Exception {
317                if (responseListener == null) {
318                        responseListener = new ResponseListener(this);
319                        responseListener.start();
320                        clientStarted = true;
321                }
322        }
323
324        /**
325         * This function is used to send a ping message to see if the connection
326         * works
327         *
328         * @param env
329         * @throws Exception
330         */
331        public void tryConnection(Properties env) throws Exception {
332                Channel channel = connection.createChannel();
333                String message = "ping";
334
335                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
336                String queueName = exchange;
337                String routingKey = "routingKey";
338
339                channel.exchangeDeclare(exchange, "direct");
340                channel.queueDeclare(queueName, false, false, false, null);
341                channel.queueBind(queueName, exchange, routingKey);
342
343                channel.basicPublish(exchange, routingKey, null, message.getBytes());
344
345                QueueingConsumer consumer = new QueueingConsumer(channel);
346
347                channel.basicConsume(queueName, true, consumer);
348                Delivery delivery = consumer.nextDelivery(1000);
349
350                channel.exchangeDelete(exchange);
351                channel.queueDelete(queueName);
352
353                channel.close();
354
355                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
356                        throw new IOException("Ping initialitzation has failed");
357                }
358        }
359
360        /**
361         * This method adds a ShutdownListener to the Broker's connection. When this
362         * connection falls, a new connection will be created and this will also
363         * have the listener.
364         */
365        private void addFaultTolerance() {
366                connection.addShutdownListener(new ShutdownListener() {
367                        @Override
368                        public void shutdownCompleted(ShutdownSignalException cause) {
369                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
370                                if (!connectionClosed)
371                                        if (cause.isHardError()) {
372                                                if (connection.isOpen()) {
373                                                        try {
374                                                                connection.close();
375                                                        } catch (IOException e) {
376                                                                e.printStackTrace();
377                                                        }
378                                                }
379                                                try {
380                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
381                                                        channel = connection.createChannel();
382                                                        addFaultTolerance();
383                                                } catch (Exception e) {
384                                                        e.printStackTrace();
385                                                }
386                                        } else {
387                                                Channel channel = (Channel) cause.getReference();
388                                                if (channel.isOpen()) {
389                                                        try {
390                                                                channel.close();
391                                                        } catch (IOException e) {
392                                                                e.printStackTrace();
393                                                        }
394                                                }
395                                        }
396                        }
397                });
398        }
399
400        public Properties getEnvironment() {
401                return environment;
402        }
403
404        public ResponseListener getResponseListener() {
405                return responseListener;
406        }
407
408        public Serializer getSerializer() {
409                return serializer;
410        }
411
412        public Map<String, RemoteObject> getRemoteObjs() {
413                return remoteObjs;
414        }
415
416        /*
417         * Supervisor
418         */
419        public void setSupervisor(String supervisorName, String brokerSet, String brokerName) throws Exception {
420                // Create a RemoteBrokerImpl
421                bind(brokerSet, brokerName, new RemoteBrokerImpl());
422                // Subscribe broker
423                supervisor = lookup(supervisorName, Supervisor.class);
424                supervisor.subscribe(brokerSet, brokerName);
425                logger.info("Supervisor set: " + supervisorName + ", BrokerSet: " + brokerSet + ", BrokerName: " + brokerName);
426        }
427
428        public Supervisor getSupervisor() {
429                return supervisor;
430        }
431}
Note: See TracBrowser for help on using the repository browser.