source: trunk/src/main/java/omq/common/broker/Broker.java @ 83

Last change on this file since 83 was 83, checked in by stoda, 11 years ago

J

File size: 10.0 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.lang.reflect.Proxy;
5import java.net.URL;
6import java.util.HashMap;
7import java.util.Hashtable;
8import java.util.Map;
9import java.util.Properties;
10
11import omq.Remote;
12import omq.client.listener.ResponseListener;
13import omq.client.proxy.MultiProxymq;
14import omq.client.proxy.Proxymq;
15import omq.common.util.OmqConnectionFactory;
16import omq.common.util.ParameterQueue;
17import omq.common.util.Serializer;
18import omq.exception.AlreadyBoundException;
19import omq.exception.InitBrokerException;
20import omq.exception.RemoteException;
21import omq.server.RemoteObject;
22
23import org.apache.log4j.Logger;
24import org.apache.log4j.xml.DOMConfigurator;
25
26import com.rabbitmq.client.Channel;
27import com.rabbitmq.client.Connection;
28import com.rabbitmq.client.QueueingConsumer;
29import com.rabbitmq.client.QueueingConsumer.Delivery;
30import com.rabbitmq.client.ShutdownListener;
31import com.rabbitmq.client.ShutdownSignalException;
32
33/**
34 * A "broker" allows a new connection to a RabbitMQ server. Under this
35 * connection it can have binded object and proxies.
36 *
37 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
38 *
39 */
40public class Broker {
41
42        private static final Logger logger = Logger.getLogger(Broker.class.getName());
43
44        private Connection connection;
45        private Channel channel;
46        private ResponseListener responseListener;
47        private Serializer serializer;
48        private boolean clientStarted = false;
49        private boolean connectionClosed = false;
50        private Properties environment = null;
51        private Map<String, RemoteObject> remoteObjs;
52        private Map<String, Object> proxies = new Hashtable<String, Object>();
53        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
54
55        public Broker(Properties env) throws Exception {
56                // Load log4j configuration
57                URL log4jResource = Broker.class.getResource("/log4j.xml");
58                DOMConfigurator.configure(log4jResource);
59
60                remoteObjs = new HashMap<String, RemoteObject>();
61                serializer = new Serializer(env);
62                environment = env;
63                connection = OmqConnectionFactory.getNewConnection(env);
64                channel = connection.createChannel();
65                addFaultTolerance();
66                try {
67                        tryConnection(env);
68                } catch (Exception e) {
69                        channel.close();
70                        connection.close();
71                        throw new InitBrokerException("The connection didn't work");
72                }
73        }
74
75        /**
76         * This method stops the broker's connection and all the threads created
77         *
78         * @throws Exception
79         */
80        public void stopBroker() throws Exception {
81                logger.warn("Stopping broker");
82                // Stop the client
83                if (clientStarted) {
84                        responseListener.kill();
85                        // TODO proxies = null; ??
86                }
87                // Stop all the remote objects working
88                for (String reference : remoteObjs.keySet()) {
89                        unbind(reference);
90                }
91
92                // Close the connection once all the listeners are died
93                closeConnection();
94
95                clientStarted = false;
96                connectionClosed = false;
97                environment = null;
98                remoteObjs = null;
99        }
100
101        /**
102         * @return Broker's connection
103         * @throws Exception
104         */
105        public Connection getConnection() throws Exception {
106                return connection;
107        }
108
109        /**
110         * This method close the broker's connection
111         *
112         * @throws IOException
113         */
114        public void closeConnection() throws IOException {
115                logger.warn("Clossing connection");
116                connectionClosed = true;
117                connection.close();
118                connectionClosed = false;
119        }
120
121        /**
122         * Return the broker's channel
123         *
124         * @return Broker's channel
125         * @throws Exception
126         */
127        public Channel getChannel() throws Exception {
128                return channel;
129        }
130
131        /**
132         * Creates a new channel using the Broker's connection
133         *
134         * @return newChannel
135         * @throws IOException
136         */
137        public Channel getNewChannel() throws IOException {
138                return connection.createChannel();
139        }
140
141        /**
142         * Returns the remote object for specified reference.
143         *
144         * @param reference
145         *            - Binding name
146         * @param contract
147         *            - Remote Interface
148         * @return newProxy
149         * @throws RemoteException
150         */
151        @SuppressWarnings("unchecked")
152        public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
153                try {
154
155                        if (!clientStarted) {
156                                initClient();
157                        }
158
159                        if (!proxies.containsKey(reference)) {
160                                Proxymq proxy = new Proxymq(reference, contract, this);
161                                Class<?>[] array = { contract };
162                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
163                                proxies.put(reference, newProxy);
164                                return (T) newProxy;
165                        }
166                        return (T) proxies.get(reference);
167
168                } catch (Exception e) {
169                        throw new RemoteException(e);
170                }
171        }
172
173        /**
174         * Returns the remote object for specified reference. This function returns
175         * an special type of proxy, every method invoked will be multi and
176         * asynchronous.
177         *
178         * @param reference
179         *            - Binding name
180         * @param contract
181         *            - Remote Interface
182         * @return newProxy
183         * @throws RemoteException
184         */
185        @SuppressWarnings("unchecked")
186        public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException {
187                try {
188                        if (!multiProxies.containsKey(reference)) {
189                                MultiProxymq proxy = new MultiProxymq(reference, contract, this);
190                                Class<?>[] array = { contract };
191                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
192                                multiProxies.put(reference, newProxy);
193                                return (T) newProxy;
194                        }
195                        return (T) multiProxies.get(reference);
196
197                } catch (Exception e) {
198                        throw new RemoteException(e);
199                }
200        }
201
202        /**
203         * Binds the reference to the specified remote object. This function uses
204         * the broker's environment
205         *
206         * @param reference
207         *            - Binding name
208         * @param remote
209         *            - RemoteObject to bind
210         * @throws RemoteException
211         *             If the remote operation failed
212         * @throws AlreadyBoundException
213         *             If name is already bound.
214         */
215        public void bind(String reference, RemoteObject remote) throws RemoteException, AlreadyBoundException {
216                bind(reference, remote, environment);
217        }
218
219        /**
220         * Binds the reference to the specified remote object. This function uses
221         * the broker's environment
222         *
223         * @param reference
224         *            - Binding name
225         * @param remote
226         *            - RemoteObject to bind
227         * @param env
228         *            - RemoteObject environment. You can set how many threads will
229         *            be listen to the reference, the multiqueue name and the
230         *            properties of the object queue and multiqueue
231         * @throws RemoteException
232         *             If the remote operation failed
233         * @throws AlreadyBoundException
234         *             If name is already bound.
235         */
236        public void bind(String reference, RemoteObject remote, Properties env) throws RemoteException, AlreadyBoundException {
237                if (remoteObjs.containsKey(reference)) {
238                        throw new AlreadyBoundException(reference);
239                }
240                // Try to start the remtoeObject listeners
241                try {
242                        remote.startRemoteObject(reference, this, env);
243                        remoteObjs.put(reference, remote);
244                } catch (Exception e) {
245                        throw new RemoteException(e);
246                }
247        }
248
249        /**
250         * Unbinds a remoteObject from its reference and kills all the threads
251         * created.
252         *
253         * @param reference
254         *            - Binding name
255         * @throws RemoteException
256         *             If the remote operation failed
257         * @throws IOException
258         *             If there are problems while killing the threads
259         */
260        public void unbind(String reference) throws RemoteException, IOException {
261                if (remoteObjs.containsKey(reference)) {
262                        RemoteObject remote = remoteObjs.get(reference);
263                        remote.kill();
264                } else {
265                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
266                }
267
268        }
269
270        /**
271         * This method ensures the client will have only one ResponseListener.
272         *
273         * @throws Exception
274         */
275        private synchronized void initClient() throws Exception {
276                if (responseListener == null) {
277                        responseListener = new ResponseListener(this);
278                        responseListener.start();
279                        clientStarted = true;
280                }
281        }
282
283        /**
284         * This function is used to send a ping message to see if the connection
285         * works
286         *
287         * @param env
288         * @throws Exception
289         */
290        public void tryConnection(Properties env) throws Exception {
291                Channel channel = connection.createChannel();
292                String message = "ping";
293
294                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
295                String queueName = exchange;
296                String routingKey = "routingKey";
297
298                channel.exchangeDeclare(exchange, "direct");
299                channel.queueDeclare(queueName, false, false, false, null);
300                channel.queueBind(queueName, exchange, routingKey);
301
302                channel.basicPublish(exchange, routingKey, null, message.getBytes());
303
304                QueueingConsumer consumer = new QueueingConsumer(channel);
305
306                channel.basicConsume(queueName, true, consumer);
307                Delivery delivery = consumer.nextDelivery(1000);
308
309                channel.exchangeDelete(exchange);
310                channel.queueDelete(queueName);
311
312                channel.close();
313
314                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
315                        throw new IOException("Ping initialitzation has failed");
316                }
317        }
318
319        /**
320         * This method adds a ShutdownListener to the Broker's connection. When this
321         * connection falls, a new connection will be created and this will also
322         * have the listener.
323         */
324        private void addFaultTolerance() {
325                connection.addShutdownListener(new ShutdownListener() {
326                        @Override
327                        public void shutdownCompleted(ShutdownSignalException cause) {
328                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
329                                if (!connectionClosed)
330                                        if (cause.isHardError()) {
331                                                if (connection.isOpen()) {
332                                                        try {
333                                                                connection.close();
334                                                        } catch (IOException e) {
335                                                                e.printStackTrace();
336                                                        }
337                                                }
338                                                try {
339                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
340                                                        channel = connection.createChannel();
341                                                        addFaultTolerance();
342                                                } catch (Exception e) {
343                                                        e.printStackTrace();
344                                                }
345                                        } else {
346                                                Channel channel = (Channel) cause.getReference();
347                                                if (channel.isOpen()) {
348                                                        try {
349                                                                channel.close();
350                                                        } catch (IOException e) {
351                                                                e.printStackTrace();
352                                                        }
353                                                }
354                                        }
355                        }
356                });
357        }
358
359        public Properties getEnvironment() {
360                return environment;
361        }
362
363        public ResponseListener getResponseListener() {
364                return responseListener;
365        }
366
367        public Serializer getSerializer() {
368                return serializer;
369        }
370}
Note: See TracBrowser for help on using the repository browser.