source: trunk/src/main/java/omq/common/broker/Broker.java @ 83

Last change on this file since 83 was 83, checked in by stoda, 11 years ago

J

File size: 10.0 KB
RevLine 
[44]1package omq.common.broker;
2
3import java.io.IOException;
[70]4import java.lang.reflect.Proxy;
[49]5import java.net.URL;
[44]6import java.util.HashMap;
[59]7import java.util.Hashtable;
[44]8import java.util.Map;
9import java.util.Properties;
10
11import omq.Remote;
12import omq.client.listener.ResponseListener;
[70]13import omq.client.proxy.MultiProxymq;
[44]14import omq.client.proxy.Proxymq;
15import omq.common.util.OmqConnectionFactory;
16import omq.common.util.ParameterQueue;
17import omq.common.util.Serializer;
[83]18import omq.exception.AlreadyBoundException;
[44]19import omq.exception.InitBrokerException;
20import omq.exception.RemoteException;
21import omq.server.RemoteObject;
22
[49]23import org.apache.log4j.Logger;
24import org.apache.log4j.xml.DOMConfigurator;
25
[44]26import com.rabbitmq.client.Channel;
27import com.rabbitmq.client.Connection;
28import com.rabbitmq.client.QueueingConsumer;
29import com.rabbitmq.client.QueueingConsumer.Delivery;
30import com.rabbitmq.client.ShutdownListener;
31import com.rabbitmq.client.ShutdownSignalException;
32
[83]33/**
34 * A "broker" allows a new connection to a RabbitMQ server. Under this
35 * connection it can have binded object and proxies.
36 *
37 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
38 *
39 */
[44]40public class Broker {
[49]41
42        private static final Logger logger = Logger.getLogger(Broker.class.getName());
43
[53]44        private Connection connection;
45        private Channel channel;
46        private ResponseListener responseListener;
47        private Serializer serializer;
48        private boolean clientStarted = false;
49        private boolean connectionClosed = false;
50        private Properties environment = null;
51        private Map<String, RemoteObject> remoteObjs;
[59]52        private Map<String, Object> proxies = new Hashtable<String, Object>();
[70]53        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
[44]54
[53]55        public Broker(Properties env) throws Exception {
56                // Load log4j configuration
57                URL log4jResource = Broker.class.getResource("/log4j.xml");
58                DOMConfigurator.configure(log4jResource);
[49]59
[53]60                remoteObjs = new HashMap<String, RemoteObject>();
61                serializer = new Serializer(env);
62                environment = env;
63                connection = OmqConnectionFactory.getNewConnection(env);
64                channel = connection.createChannel();
65                addFaultTolerance();
66                try {
67                        tryConnection(env);
68                } catch (Exception e) {
69                        channel.close();
70                        connection.close();
71                        throw new InitBrokerException("The connection didn't work");
[44]72                }
73        }
74
[83]75        /**
76         * This method stops the broker's connection and all the threads created
77         *
78         * @throws Exception
79         */
[53]80        public void stopBroker() throws Exception {
[49]81                logger.warn("Stopping broker");
[44]82                // Stop the client
83                if (clientStarted) {
[53]84                        responseListener.kill();
[70]85                        // TODO proxies = null; ??
[44]86                }
87                // Stop all the remote objects working
88                for (String reference : remoteObjs.keySet()) {
89                        unbind(reference);
90                }
[47]91
[44]92                // Close the connection once all the listeners are died
93                closeConnection();
[47]94
95                clientStarted = false;
96                connectionClosed = false;
97                environment = null;
98                remoteObjs = null;
[44]99        }
100
101        /**
102         * @return Broker's connection
103         * @throws Exception
104         */
[53]105        public Connection getConnection() throws Exception {
[44]106                return connection;
107        }
108
[83]109        /**
110         * This method close the broker's connection
111         *
112         * @throws IOException
113         */
[53]114        public void closeConnection() throws IOException {
[49]115                logger.warn("Clossing connection");
[44]116                connectionClosed = true;
117                connection.close();
[47]118                connectionClosed = false;
[44]119        }
120
121        /**
[83]122         * Return the broker's channel
[44]123         *
124         * @return Broker's channel
125         * @throws Exception
126         */
[53]127        public Channel getChannel() throws Exception {
[44]128                return channel;
129        }
130
131        /**
132         * Creates a new channel using the Broker's connection
133         *
134         * @return newChannel
135         * @throws IOException
136         */
[53]137        public Channel getNewChannel() throws IOException {
[44]138                return connection.createChannel();
139        }
140
[83]141        /**
142         * Returns the remote object for specified reference.
143         *
144         * @param reference
145         *            - Binding name
146         * @param contract
147         *            - Remote Interface
148         * @return newProxy
149         * @throws RemoteException
150         */
[44]151        @SuppressWarnings("unchecked")
[59]152        public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
[44]153                try {
154
155                        if (!clientStarted) {
[83]156                                initClient();
[44]157                        }
158
[59]159                        if (!proxies.containsKey(reference)) {
[53]160                                Proxymq proxy = new Proxymq(reference, contract, this);
[44]161                                Class<?>[] array = { contract };
[70]162                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
[59]163                                proxies.put(reference, newProxy);
164                                return (T) newProxy;
[44]165                        }
[59]166                        return (T) proxies.get(reference);
[44]167
168                } catch (Exception e) {
169                        throw new RemoteException(e);
170                }
171        }
172
[83]173        /**
174         * Returns the remote object for specified reference. This function returns
175         * an special type of proxy, every method invoked will be multi and
176         * asynchronous.
177         *
178         * @param reference
179         *            - Binding name
180         * @param contract
181         *            - Remote Interface
182         * @return newProxy
183         * @throws RemoteException
184         */
[70]185        @SuppressWarnings("unchecked")
186        public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException {
187                try {
188                        if (!multiProxies.containsKey(reference)) {
189                                MultiProxymq proxy = new MultiProxymq(reference, contract, this);
190                                Class<?>[] array = { contract };
191                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
192                                multiProxies.put(reference, newProxy);
193                                return (T) newProxy;
194                        }
195                        return (T) multiProxies.get(reference);
196
197                } catch (Exception e) {
198                        throw new RemoteException(e);
199                }
200        }
201
[83]202        /**
203         * Binds the reference to the specified remote object. This function uses
204         * the broker's environment
205         *
206         * @param reference
207         *            - Binding name
208         * @param remote
209         *            - RemoteObject to bind
210         * @throws RemoteException
211         *             If the remote operation failed
212         * @throws AlreadyBoundException
213         *             If name is already bound.
214         */
215        public void bind(String reference, RemoteObject remote) throws RemoteException, AlreadyBoundException {
[74]216                bind(reference, remote, environment);
[44]217        }
[70]218
[83]219        /**
220         * Binds the reference to the specified remote object. This function uses
221         * the broker's environment
222         *
223         * @param reference
224         *            - Binding name
225         * @param remote
226         *            - RemoteObject to bind
227         * @param env
228         *            - RemoteObject environment. You can set how many threads will
229         *            be listen to the reference, the multiqueue name and the
230         *            properties of the object queue and multiqueue
231         * @throws RemoteException
232         *             If the remote operation failed
233         * @throws AlreadyBoundException
234         *             If name is already bound.
235         */
236        public void bind(String reference, RemoteObject remote, Properties env) throws RemoteException, AlreadyBoundException {
237                if (remoteObjs.containsKey(reference)) {
238                        throw new AlreadyBoundException(reference);
239                }
240                // Try to start the remtoeObject listeners
[66]241                try {
[74]242                        remote.startRemoteObject(reference, this, env);
[66]243                        remoteObjs.put(reference, remote);
244                } catch (Exception e) {
245                        throw new RemoteException(e);
246                }
247        }
[44]248
[83]249        /**
250         * Unbinds a remoteObject from its reference and kills all the threads
251         * created.
252         *
253         * @param reference
254         *            - Binding name
255         * @throws RemoteException
256         *             If the remote operation failed
257         * @throws IOException
258         *             If there are problems while killing the threads
259         */
[53]260        public void unbind(String reference) throws RemoteException, IOException {
[44]261                if (remoteObjs.containsKey(reference)) {
262                        RemoteObject remote = remoteObjs.get(reference);
263                        remote.kill();
264                } else {
265                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
266                }
267
268        }
269
270        /**
[83]271         * This method ensures the client will have only one ResponseListener.
[44]272         *
273         * @throws Exception
274         */
[83]275        private synchronized void initClient() throws Exception {
[53]276                if (responseListener == null) {
277                        responseListener = new ResponseListener(this);
[54]278                        responseListener.start();
[83]279                        clientStarted = true;
[44]280                }
281        }
282
283        /**
284         * This function is used to send a ping message to see if the connection
285         * works
286         *
287         * @param env
288         * @throws Exception
289         */
[53]290        public void tryConnection(Properties env) throws Exception {
[44]291                Channel channel = connection.createChannel();
292                String message = "ping";
293
294                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
295                String queueName = exchange;
296                String routingKey = "routingKey";
297
298                channel.exchangeDeclare(exchange, "direct");
299                channel.queueDeclare(queueName, false, false, false, null);
300                channel.queueBind(queueName, exchange, routingKey);
301
302                channel.basicPublish(exchange, routingKey, null, message.getBytes());
303
304                QueueingConsumer consumer = new QueueingConsumer(channel);
305
306                channel.basicConsume(queueName, true, consumer);
307                Delivery delivery = consumer.nextDelivery(1000);
308
309                channel.exchangeDelete(exchange);
310                channel.queueDelete(queueName);
311
312                channel.close();
313
314                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
315                        throw new IOException("Ping initialitzation has failed");
316                }
317        }
318
319        /**
320         * This method adds a ShutdownListener to the Broker's connection. When this
321         * connection falls, a new connection will be created and this will also
322         * have the listener.
323         */
[53]324        private void addFaultTolerance() {
[44]325                connection.addShutdownListener(new ShutdownListener() {
326                        @Override
327                        public void shutdownCompleted(ShutdownSignalException cause) {
[49]328                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
[44]329                                if (!connectionClosed)
330                                        if (cause.isHardError()) {
331                                                if (connection.isOpen()) {
332                                                        try {
333                                                                connection.close();
334                                                        } catch (IOException e) {
335                                                                e.printStackTrace();
336                                                        }
337                                                }
338                                                try {
[47]339                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
[44]340                                                        channel = connection.createChannel();
341                                                        addFaultTolerance();
342                                                } catch (Exception e) {
343                                                        e.printStackTrace();
344                                                }
345                                        } else {
346                                                Channel channel = (Channel) cause.getReference();
347                                                if (channel.isOpen()) {
348                                                        try {
349                                                                channel.close();
350                                                        } catch (IOException e) {
351                                                                e.printStackTrace();
352                                                        }
353                                                }
354                                        }
355                        }
356                });
357        }
358
[53]359        public Properties getEnvironment() {
[47]360                return environment;
361        }
362
[53]363        public ResponseListener getResponseListener() {
364                return responseListener;
365        }
366
367        public Serializer getSerializer() {
368                return serializer;
369        }
[44]370}
Note: See TracBrowser for help on using the repository browser.