source: trunk/src/main/java/omq/common/broker/Broker.java @ 74

Last change on this file since 74 was 74, checked in by stoda, 11 years ago

WorkspaceTest?

File size: 7.8 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.lang.reflect.Proxy;
5import java.net.URL;
6import java.util.HashMap;
7import java.util.Hashtable;
8import java.util.Map;
9import java.util.Properties;
10
11import omq.Remote;
12import omq.client.listener.ResponseListener;
13import omq.client.proxy.MultiProxymq;
14import omq.client.proxy.Proxymq;
15import omq.common.util.OmqConnectionFactory;
16import omq.common.util.ParameterQueue;
17import omq.common.util.Serializer;
18import omq.exception.InitBrokerException;
19import omq.exception.RemoteException;
20import omq.server.RemoteObject;
21
22import org.apache.log4j.Logger;
23import org.apache.log4j.xml.DOMConfigurator;
24
25import com.rabbitmq.client.Channel;
26import com.rabbitmq.client.Connection;
27import com.rabbitmq.client.QueueingConsumer;
28import com.rabbitmq.client.QueueingConsumer.Delivery;
29import com.rabbitmq.client.ShutdownListener;
30import com.rabbitmq.client.ShutdownSignalException;
31
32public class Broker {
33
34        private static final Logger logger = Logger.getLogger(Broker.class.getName());
35
36        private Connection connection;
37        private Channel channel;
38        private ResponseListener responseListener;
39        private Serializer serializer;
40        private boolean clientStarted = false;
41        private boolean connectionClosed = false;
42        private Properties environment = null;
43        private Map<String, RemoteObject> remoteObjs;
44        private Map<String, Object> proxies = new Hashtable<String, Object>();
45        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
46
47        public Broker(Properties env) throws Exception {
48                // Load log4j configuration
49                URL log4jResource = Broker.class.getResource("/log4j.xml");
50                DOMConfigurator.configure(log4jResource);
51
52                remoteObjs = new HashMap<String, RemoteObject>();
53                serializer = new Serializer(env);
54                environment = env;
55                connection = OmqConnectionFactory.getNewConnection(env);
56                channel = connection.createChannel();
57                addFaultTolerance();
58                try {
59                        tryConnection(env);
60                } catch (Exception e) {
61                        channel.close();
62                        connection.close();
63                        throw new InitBrokerException("The connection didn't work");
64                }
65        }
66
67        public void stopBroker() throws Exception {
68                logger.warn("Stopping broker");
69                // Stop the client
70                if (clientStarted) {
71                        responseListener.kill();
72                        // TODO proxies = null; ??
73                }
74                // Stop all the remote objects working
75                for (String reference : remoteObjs.keySet()) {
76                        unbind(reference);
77                }
78
79                // Close the connection once all the listeners are died
80                closeConnection();
81
82                clientStarted = false;
83                connectionClosed = false;
84                environment = null;
85                remoteObjs = null;
86                // Serializer.removeSerializers();
87        }
88
89        /**
90         * @return Broker's connection
91         * @throws Exception
92         */
93        public Connection getConnection() throws Exception {
94                return connection;
95        }
96
97        public void closeConnection() throws IOException {
98                logger.warn("Clossing connection");
99                connectionClosed = true;
100                connection.close();
101                connectionClosed = false;
102        }
103
104        /**
105         *
106         * @return Broker's channel
107         * @throws Exception
108         */
109        public Channel getChannel() throws Exception {
110                return channel;
111        }
112
113        /**
114         * Creates a new channel using the Broker's connection
115         *
116         * @return newChannel
117         * @throws IOException
118         */
119        public Channel getNewChannel() throws IOException {
120                return connection.createChannel();
121        }
122
123        @SuppressWarnings("unchecked")
124        public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
125                try {
126
127                        if (!clientStarted) {
128                                initClient(environment);
129                                clientStarted = true;
130                        }
131
132                        if (!proxies.containsKey(reference)) {
133                                Proxymq proxy = new Proxymq(reference, contract, this);
134                                Class<?>[] array = { contract };
135                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
136                                proxies.put(reference, newProxy);
137                                return (T) newProxy;
138                        }
139                        return (T) proxies.get(reference);
140
141                } catch (Exception e) {
142                        throw new RemoteException(e);
143                }
144        }
145
146        @SuppressWarnings("unchecked")
147        public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException {
148                try {
149                        if (!multiProxies.containsKey(reference)) {
150                                MultiProxymq proxy = new MultiProxymq(reference, contract, this);
151                                Class<?>[] array = { contract };
152                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
153                                multiProxies.put(reference, newProxy);
154                                return (T) newProxy;
155                        }
156                        return (T) multiProxies.get(reference);
157
158                } catch (Exception e) {
159                        throw new RemoteException(e);
160                }
161        }
162
163        public void bind(String reference, RemoteObject remote) throws RemoteException {
164                bind(reference, remote, environment);
165        }
166
167        public void bind(String reference, RemoteObject remote, Properties env) throws RemoteException {
168                try {
169                        remote.startRemoteObject(reference, this, env);
170                        remoteObjs.put(reference, remote);
171                } catch (Exception e) {
172                        throw new RemoteException(e);
173                }
174        }
175
176        public void unbind(String reference) throws RemoteException, IOException {
177                if (remoteObjs.containsKey(reference)) {
178                        RemoteObject remote = remoteObjs.get(reference);
179                        remote.kill();
180                } else {
181                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
182                }
183
184        }
185
186        public void rebind(String name, Remote obj) throws RemoteException {
187
188        }
189
190        /**
191         * This method ensures the client will have only one ResponseListener and
192         * only one EventDispatcher. Both with the same environment.
193         *
194         * @param environment
195         * @throws Exception
196         */
197        private synchronized void initClient(Properties environment) throws Exception {
198                if (responseListener == null) {
199                        responseListener = new ResponseListener(this);
200                        responseListener.start();
201                }
202        }
203
204        /**
205         * This function is used to send a ping message to see if the connection
206         * works
207         *
208         * @param env
209         * @throws Exception
210         */
211        public void tryConnection(Properties env) throws Exception {
212                Channel channel = connection.createChannel();
213                String message = "ping";
214
215                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
216                String queueName = exchange;
217                String routingKey = "routingKey";
218
219                channel.exchangeDeclare(exchange, "direct");
220                channel.queueDeclare(queueName, false, false, false, null);
221                channel.queueBind(queueName, exchange, routingKey);
222
223                channel.basicPublish(exchange, routingKey, null, message.getBytes());
224
225                QueueingConsumer consumer = new QueueingConsumer(channel);
226
227                channel.basicConsume(queueName, true, consumer);
228                Delivery delivery = consumer.nextDelivery(1000);
229
230                channel.exchangeDelete(exchange);
231                channel.queueDelete(queueName);
232
233                channel.close();
234
235                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
236                        throw new IOException("Ping initialitzation has failed");
237                }
238        }
239
240        /**
241         * This method adds a ShutdownListener to the Broker's connection. When this
242         * connection falls, a new connection will be created and this will also
243         * have the listener.
244         */
245        private void addFaultTolerance() {
246                connection.addShutdownListener(new ShutdownListener() {
247                        @Override
248                        public void shutdownCompleted(ShutdownSignalException cause) {
249                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
250                                if (!connectionClosed)
251                                        if (cause.isHardError()) {
252                                                if (connection.isOpen()) {
253                                                        try {
254                                                                connection.close();
255                                                        } catch (IOException e) {
256                                                                e.printStackTrace();
257                                                        }
258                                                }
259                                                try {
260                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
261                                                        channel = connection.createChannel();
262                                                        addFaultTolerance();
263                                                } catch (Exception e) {
264                                                        e.printStackTrace();
265                                                }
266                                        } else {
267                                                Channel channel = (Channel) cause.getReference();
268                                                if (channel.isOpen()) {
269                                                        try {
270                                                                channel.close();
271                                                        } catch (IOException e) {
272                                                                e.printStackTrace();
273                                                        }
274                                                }
275                                        }
276                        }
277                });
278        }
279
280        public Properties getEnvironment() {
281                return environment;
282        }
283
284        public ResponseListener getResponseListener() {
285                return responseListener;
286        }
287
288        public Serializer getSerializer() {
289                return serializer;
290        }
291}
Note: See TracBrowser for help on using the repository browser.