source: trunk/src/main/java/omq/common/broker/Broker.java @ 74

Last change on this file since 74 was 74, checked in by stoda, 11 years ago

WorkspaceTest?

File size: 7.8 KB
RevLine 
[44]1package omq.common.broker;
2
3import java.io.IOException;
[70]4import java.lang.reflect.Proxy;
[49]5import java.net.URL;
[44]6import java.util.HashMap;
[59]7import java.util.Hashtable;
[44]8import java.util.Map;
9import java.util.Properties;
10
11import omq.Remote;
12import omq.client.listener.ResponseListener;
[70]13import omq.client.proxy.MultiProxymq;
[44]14import omq.client.proxy.Proxymq;
15import omq.common.util.OmqConnectionFactory;
16import omq.common.util.ParameterQueue;
17import omq.common.util.Serializer;
18import omq.exception.InitBrokerException;
19import omq.exception.RemoteException;
20import omq.server.RemoteObject;
21
[49]22import org.apache.log4j.Logger;
23import org.apache.log4j.xml.DOMConfigurator;
24
[44]25import com.rabbitmq.client.Channel;
26import com.rabbitmq.client.Connection;
27import com.rabbitmq.client.QueueingConsumer;
28import com.rabbitmq.client.QueueingConsumer.Delivery;
29import com.rabbitmq.client.ShutdownListener;
30import com.rabbitmq.client.ShutdownSignalException;
31
32public class Broker {
[49]33
34        private static final Logger logger = Logger.getLogger(Broker.class.getName());
35
[53]36        private Connection connection;
37        private Channel channel;
38        private ResponseListener responseListener;
39        private Serializer serializer;
40        private boolean clientStarted = false;
41        private boolean connectionClosed = false;
42        private Properties environment = null;
43        private Map<String, RemoteObject> remoteObjs;
[59]44        private Map<String, Object> proxies = new Hashtable<String, Object>();
[70]45        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
[44]46
[53]47        public Broker(Properties env) throws Exception {
48                // Load log4j configuration
49                URL log4jResource = Broker.class.getResource("/log4j.xml");
50                DOMConfigurator.configure(log4jResource);
[49]51
[53]52                remoteObjs = new HashMap<String, RemoteObject>();
53                serializer = new Serializer(env);
54                environment = env;
55                connection = OmqConnectionFactory.getNewConnection(env);
56                channel = connection.createChannel();
57                addFaultTolerance();
58                try {
59                        tryConnection(env);
60                } catch (Exception e) {
61                        channel.close();
62                        connection.close();
63                        throw new InitBrokerException("The connection didn't work");
[44]64                }
65        }
66
[53]67        public void stopBroker() throws Exception {
[49]68                logger.warn("Stopping broker");
[44]69                // Stop the client
70                if (clientStarted) {
[53]71                        responseListener.kill();
[70]72                        // TODO proxies = null; ??
[44]73                }
74                // Stop all the remote objects working
75                for (String reference : remoteObjs.keySet()) {
76                        unbind(reference);
77                }
[47]78
[44]79                // Close the connection once all the listeners are died
80                closeConnection();
[47]81
82                clientStarted = false;
83                connectionClosed = false;
84                environment = null;
85                remoteObjs = null;
[53]86                // Serializer.removeSerializers();
[44]87        }
88
89        /**
90         * @return Broker's connection
91         * @throws Exception
92         */
[53]93        public Connection getConnection() throws Exception {
[44]94                return connection;
95        }
96
[53]97        public void closeConnection() throws IOException {
[49]98                logger.warn("Clossing connection");
[44]99                connectionClosed = true;
100                connection.close();
[47]101                connectionClosed = false;
[44]102        }
103
104        /**
105         *
106         * @return Broker's channel
107         * @throws Exception
108         */
[53]109        public Channel getChannel() throws Exception {
[44]110                return channel;
111        }
112
113        /**
114         * Creates a new channel using the Broker's connection
115         *
116         * @return newChannel
117         * @throws IOException
118         */
[53]119        public Channel getNewChannel() throws IOException {
[44]120                return connection.createChannel();
121        }
122
123        @SuppressWarnings("unchecked")
[59]124        public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
[44]125                try {
126
127                        if (!clientStarted) {
128                                initClient(environment);
129                                clientStarted = true;
130                        }
131
[59]132                        if (!proxies.containsKey(reference)) {
[53]133                                Proxymq proxy = new Proxymq(reference, contract, this);
[44]134                                Class<?>[] array = { contract };
[70]135                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
[59]136                                proxies.put(reference, newProxy);
137                                return (T) newProxy;
[44]138                        }
[59]139                        return (T) proxies.get(reference);
[44]140
141                } catch (Exception e) {
142                        throw new RemoteException(e);
143                }
144        }
145
[70]146        @SuppressWarnings("unchecked")
147        public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException {
148                try {
149                        if (!multiProxies.containsKey(reference)) {
150                                MultiProxymq proxy = new MultiProxymq(reference, contract, this);
151                                Class<?>[] array = { contract };
152                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
153                                multiProxies.put(reference, newProxy);
154                                return (T) newProxy;
155                        }
156                        return (T) multiProxies.get(reference);
157
158                } catch (Exception e) {
159                        throw new RemoteException(e);
160                }
161        }
162
[53]163        public void bind(String reference, RemoteObject remote) throws RemoteException {
[74]164                bind(reference, remote, environment);
[44]165        }
[70]166
[74]167        public void bind(String reference, RemoteObject remote, Properties env) throws RemoteException {
[66]168                try {
[74]169                        remote.startRemoteObject(reference, this, env);
[66]170                        remoteObjs.put(reference, remote);
171                } catch (Exception e) {
172                        throw new RemoteException(e);
173                }
174        }
[44]175
[53]176        public void unbind(String reference) throws RemoteException, IOException {
[44]177                if (remoteObjs.containsKey(reference)) {
178                        RemoteObject remote = remoteObjs.get(reference);
179                        remote.kill();
180                } else {
181                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
182                }
183
184        }
185
186        public void rebind(String name, Remote obj) throws RemoteException {
187
188        }
189
190        /**
191         * This method ensures the client will have only one ResponseListener and
192         * only one EventDispatcher. Both with the same environment.
193         *
194         * @param environment
195         * @throws Exception
196         */
[53]197        private synchronized void initClient(Properties environment) throws Exception {
198                if (responseListener == null) {
199                        responseListener = new ResponseListener(this);
[54]200                        responseListener.start();
[44]201                }
202        }
203
204        /**
205         * This function is used to send a ping message to see if the connection
206         * works
207         *
208         * @param env
209         * @throws Exception
210         */
[53]211        public void tryConnection(Properties env) throws Exception {
[44]212                Channel channel = connection.createChannel();
213                String message = "ping";
214
215                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
216                String queueName = exchange;
217                String routingKey = "routingKey";
218
219                channel.exchangeDeclare(exchange, "direct");
220                channel.queueDeclare(queueName, false, false, false, null);
221                channel.queueBind(queueName, exchange, routingKey);
222
223                channel.basicPublish(exchange, routingKey, null, message.getBytes());
224
225                QueueingConsumer consumer = new QueueingConsumer(channel);
226
227                channel.basicConsume(queueName, true, consumer);
228                Delivery delivery = consumer.nextDelivery(1000);
229
230                channel.exchangeDelete(exchange);
231                channel.queueDelete(queueName);
232
233                channel.close();
234
235                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
236                        throw new IOException("Ping initialitzation has failed");
237                }
238        }
239
240        /**
241         * This method adds a ShutdownListener to the Broker's connection. When this
242         * connection falls, a new connection will be created and this will also
243         * have the listener.
244         */
[53]245        private void addFaultTolerance() {
[44]246                connection.addShutdownListener(new ShutdownListener() {
247                        @Override
248                        public void shutdownCompleted(ShutdownSignalException cause) {
[49]249                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
[44]250                                if (!connectionClosed)
251                                        if (cause.isHardError()) {
252                                                if (connection.isOpen()) {
253                                                        try {
254                                                                connection.close();
255                                                        } catch (IOException e) {
256                                                                e.printStackTrace();
257                                                        }
258                                                }
259                                                try {
[47]260                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
[44]261                                                        channel = connection.createChannel();
262                                                        addFaultTolerance();
263                                                } catch (Exception e) {
264                                                        e.printStackTrace();
265                                                }
266                                        } else {
267                                                Channel channel = (Channel) cause.getReference();
268                                                if (channel.isOpen()) {
269                                                        try {
270                                                                channel.close();
271                                                        } catch (IOException e) {
272                                                                e.printStackTrace();
273                                                        }
274                                                }
275                                        }
276                        }
277                });
278        }
279
[53]280        public Properties getEnvironment() {
[47]281                return environment;
282        }
283
[53]284        public ResponseListener getResponseListener() {
285                return responseListener;
286        }
287
288        public Serializer getSerializer() {
289                return serializer;
290        }
[44]291}
Note: See TracBrowser for help on using the repository browser.