source: branches/supervisor/src/main/java/omq/common/broker/Broker.java @ 92

Last change on this file since 92 was 92, checked in by stoda, 11 years ago

TODO: delete in supervisor
check the code

File size: 10.6 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.lang.reflect.Proxy;
5import java.net.URL;
6import java.util.HashMap;
7import java.util.Hashtable;
8import java.util.Map;
9import java.util.Properties;
10
11import omq.Remote;
12import omq.client.listener.ResponseListener;
13import omq.client.proxy.MultiProxymq;
14import omq.client.proxy.Proxymq;
15import omq.common.util.OmqConnectionFactory;
16import omq.common.util.ParameterQueue;
17import omq.common.util.Serializer;
18import omq.exception.AlreadyBoundException;
19import omq.exception.InitBrokerException;
20import omq.exception.RemoteException;
21import omq.server.RemoteObject;
22import omq.supervisor.Supervisor;
23
24import org.apache.log4j.Logger;
25import org.apache.log4j.xml.DOMConfigurator;
26
27import com.rabbitmq.client.Channel;
28import com.rabbitmq.client.Connection;
29import com.rabbitmq.client.QueueingConsumer;
30import com.rabbitmq.client.QueueingConsumer.Delivery;
31import com.rabbitmq.client.ShutdownListener;
32import com.rabbitmq.client.ShutdownSignalException;
33
34/**
35 * A "broker" allows a new connection to a RabbitMQ server. Under this
36 * connection it can have binded object and proxies.
37 *
38 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
39 *
40 */
41public class Broker {
42
43        private static final Logger logger = Logger.getLogger(Broker.class.getName());
44
45        private Connection connection;
46        private Channel channel;
47        private ResponseListener responseListener;
48        private Serializer serializer;
49        private boolean clientStarted = false;
50        private boolean connectionClosed = false;
51        private Properties environment = null;
52        private Map<String, RemoteObject> remoteObjs;
53        private Map<String, Object> proxies = new Hashtable<String, Object>();
54        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
55
56        // Supervisor
57        private Supervisor supervisor;
58
59        public Broker(Properties env) throws Exception {
60                // Load log4j configuration
61                URL log4jResource = Broker.class.getResource("/log4j.xml");
62                DOMConfigurator.configure(log4jResource);
63
64                remoteObjs = new HashMap<String, RemoteObject>();
65                serializer = new Serializer(env);
66                environment = env;
67                connection = OmqConnectionFactory.getNewConnection(env);
68                channel = connection.createChannel();
69                addFaultTolerance();
70                try {
71                        tryConnection(env);
72                } catch (Exception e) {
73                        channel.close();
74                        connection.close();
75                        throw new InitBrokerException("The connection didn't work");
76                }
77        }
78
79        /**
80         * This method stops the broker's connection and all the threads created
81         *
82         * @throws Exception
83         */
84        public void stopBroker() throws Exception {
85                logger.warn("Stopping broker");
86                // Stop the client
87                if (clientStarted) {
88                        responseListener.kill();
89                        // TODO proxies = null; ??
90                }
91                // Stop all the remote objects working
92                for (String reference : remoteObjs.keySet()) {
93                        unbind(reference);
94                }
95
96                // Close the connection once all the listeners are died
97                closeConnection();
98
99                clientStarted = false;
100                connectionClosed = false;
101                environment = null;
102                remoteObjs = null;
103        }
104
105        /**
106         * @return Broker's connection
107         * @throws Exception
108         */
109        public Connection getConnection() throws Exception {
110                return connection;
111        }
112
113        /**
114         * This method close the broker's connection
115         *
116         * @throws IOException
117         */
118        public void closeConnection() throws IOException {
119                logger.warn("Clossing connection");
120                connectionClosed = true;
121                connection.close();
122                connectionClosed = false;
123        }
124
125        /**
126         * Return the broker's channel
127         *
128         * @return Broker's channel
129         * @throws Exception
130         */
131        public Channel getChannel() throws Exception {
132                return channel;
133        }
134
135        /**
136         * Creates a new channel using the Broker's connection
137         *
138         * @return newChannel
139         * @throws IOException
140         */
141        public Channel getNewChannel() throws IOException {
142                return connection.createChannel();
143        }
144
145        /**
146         * Returns the remote object for specified reference.
147         *
148         * @param reference
149         *            - Binding name
150         * @param contract
151         *            - Remote Interface
152         * @return newProxy
153         * @throws RemoteException
154         */
155        @SuppressWarnings("unchecked")
156        public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
157                try {
158
159                        if (!clientStarted) {
160                                initClient();
161                        }
162
163                        if (!proxies.containsKey(reference)) {
164                                Proxymq proxy = new Proxymq(reference, contract, this);
165                                Class<?>[] array = { contract };
166                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
167                                proxies.put(reference, newProxy);
168                                return (T) newProxy;
169                        }
170                        return (T) proxies.get(reference);
171
172                } catch (Exception e) {
173                        throw new RemoteException(e);
174                }
175        }
176
177        /**
178         * Returns the remote object for specified reference. This function returns
179         * an special type of proxy, every method invoked will be multi and
180         * asynchronous.
181         *
182         * @param reference
183         *            - Binding name
184         * @param contract
185         *            - Remote Interface
186         * @return newProxy
187         * @throws RemoteException
188         */
189        @SuppressWarnings("unchecked")
190        public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException {
191                try {
192                        if (!multiProxies.containsKey(reference)) {
193                                MultiProxymq proxy = new MultiProxymq(reference, contract, this);
194                                Class<?>[] array = { contract };
195                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
196                                multiProxies.put(reference, newProxy);
197                                return (T) newProxy;
198                        }
199                        return (T) multiProxies.get(reference);
200
201                } catch (Exception e) {
202                        throw new RemoteException(e);
203                }
204        }
205
206        /**
207         * Binds the reference to the specified remote object. This function uses
208         * the broker's environment
209         *
210         * @param reference
211         *            - Binding name
212         * @param remote
213         *            - RemoteObject to bind
214         * @throws RemoteException
215         *             If the remote operation failed
216         * @throws AlreadyBoundException
217         *             If name is already bound.
218         */
219        public void bind(String reference, RemoteObject remote) throws RemoteException, AlreadyBoundException {
220                bind(reference, remote, environment);
221        }
222
223        /**
224         * Binds the reference to the specified remote object. This function uses
225         * the broker's environment
226         *
227         * @param reference
228         *            - Binding name
229         * @param remote
230         *            - RemoteObject to bind
231         * @param env
232         *            - RemoteObject environment. You can set how many threads will
233         *            be listen to the reference, the multiqueue name and the
234         *            properties of the object queue and multiqueue
235         * @throws RemoteException
236         *             If the remote operation failed
237         * @throws AlreadyBoundException
238         *             If name is already bound.
239         */
240        public void bind(String reference, RemoteObject remote, Properties env) throws RemoteException, AlreadyBoundException {
241                if (remoteObjs.containsKey(reference)) {
242                        throw new AlreadyBoundException(reference);
243                }
244                // Try to start the remtoeObject listeners
245                try {
246                        remote.startRemoteObject(reference, this, env);
247                        remoteObjs.put(reference, remote);
248                } catch (Exception e) {
249                        throw new RemoteException(e);
250                }
251        }
252
253        /**
254         * Unbinds a remoteObject from its reference and kills all the threads
255         * created.
256         *
257         * @param reference
258         *            - Binding name
259         * @throws RemoteException
260         *             If the remote operation failed
261         * @throws IOException
262         *             If there are problems while killing the threads
263         */
264        public void unbind(String reference) throws RemoteException, IOException {
265                if (remoteObjs.containsKey(reference)) {
266                        RemoteObject remote = remoteObjs.get(reference);
267                        remote.kill();
268                } else {
269                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
270                }
271
272        }
273
274        /**
275         * This method ensures the client will have only one ResponseListener.
276         *
277         * @throws Exception
278         */
279        private synchronized void initClient() throws Exception {
280                if (responseListener == null) {
281                        responseListener = new ResponseListener(this);
282                        responseListener.start();
283                        clientStarted = true;
284                }
285        }
286
287        /**
288         * This function is used to send a ping message to see if the connection
289         * works
290         *
291         * @param env
292         * @throws Exception
293         */
294        public void tryConnection(Properties env) throws Exception {
295                Channel channel = connection.createChannel();
296                String message = "ping";
297
298                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
299                String queueName = exchange;
300                String routingKey = "routingKey";
301
302                channel.exchangeDeclare(exchange, "direct");
303                channel.queueDeclare(queueName, false, false, false, null);
304                channel.queueBind(queueName, exchange, routingKey);
305
306                channel.basicPublish(exchange, routingKey, null, message.getBytes());
307
308                QueueingConsumer consumer = new QueueingConsumer(channel);
309
310                channel.basicConsume(queueName, true, consumer);
311                Delivery delivery = consumer.nextDelivery(1000);
312
313                channel.exchangeDelete(exchange);
314                channel.queueDelete(queueName);
315
316                channel.close();
317
318                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
319                        throw new IOException("Ping initialitzation has failed");
320                }
321        }
322
323        /**
324         * This method adds a ShutdownListener to the Broker's connection. When this
325         * connection falls, a new connection will be created and this will also
326         * have the listener.
327         */
328        private void addFaultTolerance() {
329                connection.addShutdownListener(new ShutdownListener() {
330                        @Override
331                        public void shutdownCompleted(ShutdownSignalException cause) {
332                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
333                                if (!connectionClosed)
334                                        if (cause.isHardError()) {
335                                                if (connection.isOpen()) {
336                                                        try {
337                                                                connection.close();
338                                                        } catch (IOException e) {
339                                                                e.printStackTrace();
340                                                        }
341                                                }
342                                                try {
343                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
344                                                        channel = connection.createChannel();
345                                                        addFaultTolerance();
346                                                } catch (Exception e) {
347                                                        e.printStackTrace();
348                                                }
349                                        } else {
350                                                Channel channel = (Channel) cause.getReference();
351                                                if (channel.isOpen()) {
352                                                        try {
353                                                                channel.close();
354                                                        } catch (IOException e) {
355                                                                e.printStackTrace();
356                                                        }
357                                                }
358                                        }
359                        }
360                });
361        }
362
363        public Properties getEnvironment() {
364                return environment;
365        }
366
367        public ResponseListener getResponseListener() {
368                return responseListener;
369        }
370
371        public Serializer getSerializer() {
372                return serializer;
373        }
374
375        public Map<String, RemoteObject> getRemoteObjs() {
376                return remoteObjs;
377        }
378
379        /*
380         * Supervisor
381         */
382        public void setSupervisor(String supervisorName, String brokerName) throws Exception {
383                // Create a RemoteBrokerImpl
384                bind(brokerName, new RemoteBrokerImpl());
385                // Subscribe broker
386                supervisor = lookup(supervisorName, Supervisor.class);
387                supervisor.subscribe(brokerName);
388                logger.info("Supervisor set: " + supervisorName + ", BrokerName: " + brokerName);
389        }
390
391        public Supervisor getSupervisor() {
392                return supervisor;
393        }
394}
Note: See TracBrowser for help on using the repository browser.