source: trunk/src/main/java/omq/common/broker/Broker.java

Last change on this file was 112, checked in by stoda, 11 years ago

Tryconnection function mades no sense and does not work as expected

File size: 10.6 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.lang.reflect.Proxy;
5import java.util.HashMap;
6import java.util.Hashtable;
7import java.util.Map;
8import java.util.Properties;
9
10import omq.Remote;
11import omq.client.listener.ResponseListener;
12import omq.client.proxy.MultiProxymq;
13import omq.client.proxy.Proxymq;
14import omq.common.util.OmqConnectionFactory;
15import omq.common.util.ParameterQueue;
16import omq.common.util.Serializer;
17import omq.exception.AlreadyBoundException;
18import omq.exception.InitBrokerException;
19import omq.exception.RemoteException;
20import omq.server.RemoteObject;
21
22import org.apache.log4j.Logger;
23
24import com.rabbitmq.client.AMQP.BasicProperties;
25import com.rabbitmq.client.Channel;
26import com.rabbitmq.client.Connection;
27import com.rabbitmq.client.QueueingConsumer;
28import com.rabbitmq.client.QueueingConsumer.Delivery;
29import com.rabbitmq.client.ShutdownListener;
30import com.rabbitmq.client.ShutdownSignalException;
31
32/**
33 * A "broker" allows a new connection to a RabbitMQ server. Under this
34 * connection it can have binded object and proxies.
35 *
36 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
37 *
38 */
39public class Broker {
40
41        private static final Logger logger = Logger.getLogger(Broker.class.getName());
42
43        private Connection connection;
44        private Channel channel;
45        private ResponseListener responseListener;
46        private Serializer serializer;
47        private boolean clientStarted = false;
48        private boolean connectionClosed = false;
49        private Properties environment = null;
50        private Map<String, RemoteObject> remoteObjs;
51        private Map<String, Object> proxies = new Hashtable<String, Object>();
52        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
53
54        public Broker(Properties env) throws Exception {
55                // Load log4j configuration
56                // URL log4jResource = Broker.class.getResource("/log4j.xml");
57                // DOMConfigurator.configure(log4jResource);
58
59                remoteObjs = new HashMap<String, RemoteObject>();
60                serializer = new Serializer(env);
61                environment = env;
62                connection = OmqConnectionFactory.getNewConnection(env);
63                channel = connection.createChannel();
64                addFaultTolerance();
65                if (!connection.isOpen() || !channel.isOpen()) {
66                        if (connection.isOpen()) {
67                                connection.close();
68                        }
69                        throw new InitBrokerException("The connection didn't work");
70                }
71
72                // try {
73                // tryConnection(env);
74                // } catch (Exception e) {
75                // channel.close();
76                // connection.close();
77                // throw new InitBrokerException("The connection didn't work");
78                // }
79        }
80
81        /**
82         * This method stops the broker's connection and all the threads created
83         *
84         * @throws Exception
85         */
86        public void stopBroker() throws Exception {
87                logger.warn("Stopping broker");
88                // Stop the client
89                if (clientStarted) {
90                        responseListener.kill();
91                        // TODO proxies = null; ??
92                }
93                // Stop all the remote objects working
94                for (String reference : remoteObjs.keySet()) {
95                        unbind(reference);
96                }
97
98                // Close the connection once all the listeners are died
99                closeConnection();
100
101                clientStarted = false;
102                // connectionClosed = false;
103                environment = null;
104                remoteObjs = null;
105        }
106
107        /**
108         * @return Broker's connection
109         * @throws Exception
110         */
111        public Connection getConnection() throws Exception {
112                return connection;
113        }
114
115        /**
116         * This method close the broker's connection
117         *
118         * @throws IOException
119         */
120        public void closeConnection() throws IOException {
121                logger.warn("Clossing connection");
122                connectionClosed = true;
123                connection.close();
124                // connectionClosed = false;
125        }
126
127        /**
128         * Return the broker's channel
129         *
130         * @return Broker's channel
131         * @throws Exception
132         */
133        public synchronized Channel getChannel() throws Exception {
134                return channel;
135        }
136
137        /**
138         * Creates a new channel using the Broker's connection
139         *
140         * @return newChannel
141         * @throws IOException
142         */
143        public synchronized Channel getNewChannel() throws IOException {
144                return connection.createChannel();
145        }
146
147        /**
148         *
149         */
150        public synchronized void publishMessge(String exchange, String routingKey, BasicProperties props, byte[] bytesRequest) throws IOException {
151                if (!channel.isOpen()) {
152                        logger.error("Broker's channel is closed opening a new one", channel.getCloseReason());
153                        channel = getNewChannel();
154                }
155                channel.basicPublish(exchange, routingKey, props, bytesRequest);
156        }
157
158        /**
159         * Returns the remote object for specified reference.
160         *
161         * @param reference
162         *            - Binding name
163         * @param contract
164         *            - Remote Interface
165         * @return newProxy
166         * @throws RemoteException
167         */
168        @SuppressWarnings("unchecked")
169        public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
170                try {
171
172                        if (!clientStarted) {
173                                initClient();
174                        }
175
176                        if (!proxies.containsKey(reference)) {
177                                Proxymq proxy = new Proxymq(reference, contract, this);
178                                Class<?>[] array = { contract };
179                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
180                                proxies.put(reference, newProxy);
181                                return (T) newProxy;
182                        }
183                        return (T) proxies.get(reference);
184
185                } catch (Exception e) {
186                        throw new RemoteException(e);
187                }
188        }
189
190        /**
191         * Returns the remote object for specified reference. This function returns
192         * an special type of proxy, every method invoked will be multi and
193         * asynchronous.
194         *
195         * @param reference
196         *            - Binding name
197         * @param contract
198         *            - Remote Interface
199         * @return newProxy
200         * @throws RemoteException
201         */
202        @SuppressWarnings("unchecked")
203        public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException {
204                try {
205                        if (!multiProxies.containsKey(reference)) {
206                                MultiProxymq proxy = new MultiProxymq(reference, contract, this);
207                                Class<?>[] array = { contract };
208                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
209                                multiProxies.put(reference, newProxy);
210                                return (T) newProxy;
211                        }
212                        return (T) multiProxies.get(reference);
213
214                } catch (Exception e) {
215                        throw new RemoteException(e);
216                }
217        }
218
219        /**
220         * Binds the reference to the specified remote object. This function uses
221         * the broker's environment
222         *
223         * @param reference
224         *            - Binding name
225         * @param remote
226         *            - RemoteObject to bind
227         * @throws RemoteException
228         *             If the remote operation failed
229         * @throws AlreadyBoundException
230         *             If name is already bound.
231         */
232        public void bind(String reference, RemoteObject remote) throws RemoteException, AlreadyBoundException {
233                bind(reference, remote, environment);
234        }
235
236        /**
237         * Binds the reference to the specified remote object. This function uses
238         * the broker's environment
239         *
240         * @param reference
241         *            - Binding name
242         * @param remote
243         *            - RemoteObject to bind
244         * @param env
245         *            - RemoteObject environment. You can set how many threads will
246         *            be listen to the reference, the multiqueue name and the
247         *            properties of the object queue and multiqueue
248         * @throws RemoteException
249         *             If the remote operation failed
250         * @throws AlreadyBoundException
251         *             If name is already bound.
252         */
253        public void bind(String reference, RemoteObject remote, Properties env) throws RemoteException, AlreadyBoundException {
254                if (remoteObjs.containsKey(reference)) {
255                        throw new AlreadyBoundException(reference);
256                }
257                // Try to start the remtoeObject listeners
258                try {
259                        remote.startRemoteObject(reference, this, env);
260                        remoteObjs.put(reference, remote);
261                } catch (Exception e) {
262                        throw new RemoteException(e);
263                }
264        }
265
266        /**
267         * Unbinds a remoteObject from its reference and kills all the threads
268         * created.
269         *
270         * @param reference
271         *            - Binding name
272         * @throws RemoteException
273         *             If the remote operation failed
274         * @throws IOException
275         *             If there are problems while killing the threads
276         */
277        public void unbind(String reference) throws RemoteException, IOException {
278                if (remoteObjs.containsKey(reference)) {
279                        RemoteObject remote = remoteObjs.get(reference);
280                        remote.kill();
281                } else {
282                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
283                }
284
285        }
286
287        /**
288         * This method ensures the client will have only one ResponseListener.
289         *
290         * @throws Exception
291         */
292        private synchronized void initClient() throws Exception {
293                if (responseListener == null) {
294                        responseListener = new ResponseListener(this);
295                        responseListener.start();
296                        clientStarted = true;
297                }
298        }
299
300        /**
301         * This function is used to send a ping message to see if the connection
302         * works
303         *
304         * @param env
305         * @throws Exception
306         */
307        public void tryConnection(Properties env) throws Exception {
308                Channel channel = connection.createChannel();
309                String message = "ping";
310
311                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
312                String queueName = exchange;
313                String routingKey = "routingKey";
314
315                channel.exchangeDeclare(exchange, "direct");
316                channel.queueDeclare(queueName, false, false, false, null);
317                channel.queueBind(queueName, exchange, routingKey);
318
319                channel.basicPublish(exchange, routingKey, null, message.getBytes());
320
321                QueueingConsumer consumer = new QueueingConsumer(channel);
322
323                channel.basicConsume(queueName, true, consumer);
324                Delivery delivery = consumer.nextDelivery(1000);
325
326                channel.exchangeDelete(exchange);
327                channel.queueDelete(queueName);
328
329                channel.close();
330
331                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
332                        throw new IOException("Ping initialitzation has failed");
333                }
334        }
335
336        /**
337         * This method adds a ShutdownListener to the Broker's connection. When this
338         * connection falls, a new connection will be created and this will also
339         * have the listener.
340         */
341        private void addFaultTolerance() {
342                connection.addShutdownListener(new ShutdownListener() {
343                        @Override
344                        public void shutdownCompleted(ShutdownSignalException cause) {
345                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
346                                if (!connectionClosed)
347                                        if (cause.isHardError()) {
348                                                if (connection.isOpen()) {
349                                                        try {
350                                                                connection.close();
351                                                        } catch (IOException e) {
352                                                                e.printStackTrace();
353                                                        }
354                                                }
355                                                try {
356                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
357                                                        channel = connection.createChannel();
358                                                        addFaultTolerance();
359                                                } catch (Exception e) {
360                                                        e.printStackTrace();
361                                                }
362                                        } else {
363                                                Channel channel = (Channel) cause.getReference();
364                                                if (channel.isOpen()) {
365                                                        try {
366                                                                channel.close();
367                                                        } catch (IOException e) {
368                                                                e.printStackTrace();
369                                                        }
370                                                }
371                                        }
372                        }
373                });
374        }
375
376        public Properties getEnvironment() {
377                return environment;
378        }
379
380        public ResponseListener getResponseListener() {
381                return responseListener;
382        }
383
384        public Serializer getSerializer() {
385                return serializer;
386        }
387}
Note: See TracBrowser for help on using the repository browser.