source: trunk/src/main/java/omq/common/broker/Broker.java @ 72

Last change on this file since 72 was 72, checked in by stoda, 11 years ago

Events deleted instead of them there's a new example of how to use the observer pattern

File size: 7.9 KB
RevLine 
[44]1package omq.common.broker;
2
3import java.io.IOException;
[70]4import java.lang.reflect.Proxy;
[49]5import java.net.URL;
[44]6import java.util.HashMap;
[59]7import java.util.Hashtable;
[44]8import java.util.Map;
9import java.util.Properties;
10
11import omq.Remote;
12import omq.client.listener.ResponseListener;
[70]13import omq.client.proxy.MultiProxymq;
[44]14import omq.client.proxy.Proxymq;
15import omq.common.util.OmqConnectionFactory;
16import omq.common.util.ParameterQueue;
17import omq.common.util.Serializer;
18import omq.exception.InitBrokerException;
19import omq.exception.RemoteException;
20import omq.server.RemoteObject;
21
[49]22import org.apache.log4j.Logger;
23import org.apache.log4j.xml.DOMConfigurator;
24
[44]25import com.rabbitmq.client.Channel;
26import com.rabbitmq.client.Connection;
27import com.rabbitmq.client.QueueingConsumer;
28import com.rabbitmq.client.QueueingConsumer.Delivery;
29import com.rabbitmq.client.ShutdownListener;
30import com.rabbitmq.client.ShutdownSignalException;
31
32public class Broker {
[49]33
34        private static final Logger logger = Logger.getLogger(Broker.class.getName());
35
[53]36        private Connection connection;
37        private Channel channel;
38        private ResponseListener responseListener;
39        private Serializer serializer;
40        private boolean clientStarted = false;
41        private boolean connectionClosed = false;
42        private Properties environment = null;
43        private Map<String, RemoteObject> remoteObjs;
[59]44        private Map<String, Object> proxies = new Hashtable<String, Object>();
[70]45        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
[44]46
[53]47        public Broker(Properties env) throws Exception {
48                // Load log4j configuration
49                URL log4jResource = Broker.class.getResource("/log4j.xml");
50                DOMConfigurator.configure(log4jResource);
[49]51
[53]52                remoteObjs = new HashMap<String, RemoteObject>();
53                serializer = new Serializer(env);
54                environment = env;
55                connection = OmqConnectionFactory.getNewConnection(env);
56                channel = connection.createChannel();
57                addFaultTolerance();
58                try {
59                        tryConnection(env);
60                } catch (Exception e) {
61                        channel.close();
62                        connection.close();
63                        throw new InitBrokerException("The connection didn't work");
[44]64                }
65        }
66
[53]67        public void stopBroker() throws Exception {
[49]68                logger.warn("Stopping broker");
[44]69                // Stop the client
70                if (clientStarted) {
[53]71                        responseListener.kill();
[70]72                        // TODO proxies = null; ??
[44]73                }
74                // Stop all the remote objects working
75                for (String reference : remoteObjs.keySet()) {
76                        unbind(reference);
77                }
[47]78
[44]79                // Close the connection once all the listeners are died
80                closeConnection();
[47]81
82                clientStarted = false;
83                connectionClosed = false;
84                environment = null;
85                remoteObjs = null;
[53]86                // Serializer.removeSerializers();
[44]87        }
88
89        /**
90         * @return Broker's connection
91         * @throws Exception
92         */
[53]93        public Connection getConnection() throws Exception {
[44]94                return connection;
95        }
96
[53]97        public void closeConnection() throws IOException {
[49]98                logger.warn("Clossing connection");
[44]99                connectionClosed = true;
100                connection.close();
[47]101                connectionClosed = false;
[44]102        }
103
104        /**
105         *
106         * @return Broker's channel
107         * @throws Exception
108         */
[53]109        public Channel getChannel() throws Exception {
[44]110                return channel;
111        }
112
113        /**
114         * Creates a new channel using the Broker's connection
115         *
116         * @return newChannel
117         * @throws IOException
118         */
[53]119        public Channel getNewChannel() throws IOException {
[44]120                return connection.createChannel();
121        }
122
123        @SuppressWarnings("unchecked")
[59]124        public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
[44]125                try {
126
127                        if (!clientStarted) {
128                                initClient(environment);
129                                clientStarted = true;
130                        }
131
[59]132                        if (!proxies.containsKey(reference)) {
[53]133                                Proxymq proxy = new Proxymq(reference, contract, this);
[44]134                                Class<?>[] array = { contract };
[70]135                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
[59]136                                proxies.put(reference, newProxy);
137                                return (T) newProxy;
[44]138                        }
[59]139                        return (T) proxies.get(reference);
[44]140
141                } catch (Exception e) {
142                        throw new RemoteException(e);
143                }
144        }
145
[70]146        @SuppressWarnings("unchecked")
147        public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException {
148                try {
149                        if (!multiProxies.containsKey(reference)) {
150                                MultiProxymq proxy = new MultiProxymq(reference, contract, this);
151                                Class<?>[] array = { contract };
152                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
153                                multiProxies.put(reference, newProxy);
154                                return (T) newProxy;
155                        }
156                        return (T) multiProxies.get(reference);
157
158                } catch (Exception e) {
159                        throw new RemoteException(e);
160                }
161        }
162
[53]163        public void bind(String reference, RemoteObject remote) throws RemoteException {
[44]164                try {
[53]165                        remote.startRemoteObject(reference, this);
[44]166                        remoteObjs.put(reference, remote);
167                } catch (Exception e) {
168                        throw new RemoteException(e);
169                }
170        }
[70]171
[66]172        public void startTriggerEvent(String reference, RemoteObject remote) throws RemoteException {
173                try {
174                        remote.startTriggerEvent(reference, this);
175                        remoteObjs.put(reference, remote);
176                } catch (Exception e) {
177                        throw new RemoteException(e);
178                }
179        }
[44]180
[53]181        public void unbind(String reference) throws RemoteException, IOException {
[44]182                if (remoteObjs.containsKey(reference)) {
183                        RemoteObject remote = remoteObjs.get(reference);
184                        remote.kill();
185                } else {
186                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
187                }
188
189        }
190
191        public void rebind(String name, Remote obj) throws RemoteException {
192
193        }
194
195        /**
196         * This method ensures the client will have only one ResponseListener and
197         * only one EventDispatcher. Both with the same environment.
198         *
199         * @param environment
200         * @throws Exception
201         */
[53]202        private synchronized void initClient(Properties environment) throws Exception {
203                if (responseListener == null) {
204                        responseListener = new ResponseListener(this);
[54]205                        responseListener.start();
[44]206                }
207        }
208
209        /**
210         * This function is used to send a ping message to see if the connection
211         * works
212         *
213         * @param env
214         * @throws Exception
215         */
[53]216        public void tryConnection(Properties env) throws Exception {
[44]217                Channel channel = connection.createChannel();
218                String message = "ping";
219
220                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
221                String queueName = exchange;
222                String routingKey = "routingKey";
223
224                channel.exchangeDeclare(exchange, "direct");
225                channel.queueDeclare(queueName, false, false, false, null);
226                channel.queueBind(queueName, exchange, routingKey);
227
228                channel.basicPublish(exchange, routingKey, null, message.getBytes());
229
230                QueueingConsumer consumer = new QueueingConsumer(channel);
231
232                channel.basicConsume(queueName, true, consumer);
233                Delivery delivery = consumer.nextDelivery(1000);
234
235                channel.exchangeDelete(exchange);
236                channel.queueDelete(queueName);
237
238                channel.close();
239
240                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
241                        throw new IOException("Ping initialitzation has failed");
242                }
243        }
244
245        /**
246         * This method adds a ShutdownListener to the Broker's connection. When this
247         * connection falls, a new connection will be created and this will also
248         * have the listener.
249         */
[53]250        private void addFaultTolerance() {
[44]251                connection.addShutdownListener(new ShutdownListener() {
252                        @Override
253                        public void shutdownCompleted(ShutdownSignalException cause) {
[49]254                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
[44]255                                if (!connectionClosed)
256                                        if (cause.isHardError()) {
257                                                if (connection.isOpen()) {
258                                                        try {
259                                                                connection.close();
260                                                        } catch (IOException e) {
261                                                                e.printStackTrace();
262                                                        }
263                                                }
264                                                try {
[47]265                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
[44]266                                                        channel = connection.createChannel();
267                                                        addFaultTolerance();
268                                                } catch (Exception e) {
269                                                        e.printStackTrace();
270                                                }
271                                        } else {
272                                                Channel channel = (Channel) cause.getReference();
273                                                if (channel.isOpen()) {
274                                                        try {
275                                                                channel.close();
276                                                        } catch (IOException e) {
277                                                                e.printStackTrace();
278                                                        }
279                                                }
280                                        }
281                        }
282                });
283        }
284
[53]285        public Properties getEnvironment() {
[47]286                return environment;
287        }
288
[53]289        public ResponseListener getResponseListener() {
290                return responseListener;
291        }
292
293        public Serializer getSerializer() {
294                return serializer;
295        }
[44]296}
Note: See TracBrowser for help on using the repository browser.