source: branches/objectmqListeners/src/main/java/omq/common/broker/Broker.java @ 87

Last change on this file since 87 was 87, checked in by stoda, 11 years ago
File size: 8.1 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.util.HashMap;
5import java.util.Hashtable;
6import java.util.Map;
7import java.util.Properties;
8
9import omq.Remote;
10import omq.client.listener.ResponseListener;
11import omq.client.proxy.Proxymq;
12import omq.common.event.Event;
13import omq.common.event.EventDispatcher;
14import omq.common.event.EventWrapper;
15import omq.common.util.OmqConnectionFactory;
16import omq.common.util.ParameterQueue;
17import omq.common.util.Serializer;
18import omq.exception.InitBrokerException;
19import omq.exception.RemoteException;
20import omq.exception.SerializerException;
21import omq.server.RemoteObject;
22
23import org.apache.log4j.Logger;
24
25import com.rabbitmq.client.Channel;
26import com.rabbitmq.client.Connection;
27import com.rabbitmq.client.QueueingConsumer;
28import com.rabbitmq.client.QueueingConsumer.Delivery;
29import com.rabbitmq.client.ShutdownListener;
30import com.rabbitmq.client.ShutdownSignalException;
31
32public class Broker {
33
34        private static final Logger logger = Logger.getLogger(Broker.class.getName());
35
36        private Connection connection;
37        private Channel channel;
38        private ResponseListener responseListener;
39        private EventDispatcher eventDispatcher;
40        private Serializer serializer;
41        private boolean clientStarted = false;
42        private boolean connectionClosed = false;
43        private Properties environment = null;
44        private Map<String, RemoteObject> remoteObjs;
45        private Map<String, Object> proxies = new Hashtable<String, Object>();
46
47        public Broker(Properties env) throws Exception {
48                // Load log4j configuration
49                // URL log4jResource = Broker.class.getResource("/log4j.xml");
50                // DOMConfigurator.configure(log4jResource);
51
52                remoteObjs = new HashMap<String, RemoteObject>();
53                serializer = new Serializer(env);
54                environment = env;
55                connection = OmqConnectionFactory.getNewConnection(env);
56                channel = connection.createChannel();
57                addFaultTolerance();
58                try {
59                        tryConnection(env);
60                } catch (Exception e) {
61                        channel.close();
62                        connection.close();
63                        throw new InitBrokerException("The connection didn't work");
64                }
65        }
66
67        public void stopBroker() throws Exception {
68                logger.warn("Stopping broker");
69                // Stop the client
70                if (clientStarted) {
71                        responseListener.kill();
72                        eventDispatcher.kill();
73                        //TODO proxies = null; ??
74                }
75                // Stop all the remote objects working
76                for (String reference : remoteObjs.keySet()) {
77                        unbind(reference);
78                }
79
80                // Close the connection once all the listeners are died
81                closeConnection();
82
83                clientStarted = false;
84                connectionClosed = false;
85                environment = null;
86                remoteObjs = null;
87                // Serializer.removeSerializers();
88        }
89
90        /**
91         * @return Broker's connection
92         * @throws Exception
93         */
94        public Connection getConnection() throws Exception {
95                return connection;
96        }
97
98        public void closeConnection() throws IOException {
99                logger.warn("Clossing connection");
100                connectionClosed = true;
101                connection.close();
102                connectionClosed = false;
103        }
104
105        /**
106         *
107         * @return Broker's channel
108         * @throws Exception
109         */
110        public Channel getChannel() throws Exception {
111                return channel;
112        }
113
114        /**
115         * Creates a new channel using the Broker's connection
116         *
117         * @return newChannel
118         * @throws IOException
119         */
120        public Channel getNewChannel() throws IOException {
121                return connection.createChannel();
122        }
123
124        @SuppressWarnings("unchecked")
125        public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
126                try {
127
128                        if (!clientStarted) {
129                                initClient(environment);
130                                clientStarted = true;
131                        }
132
133                        if (!proxies.containsKey(reference)) {
134                                Proxymq proxy = new Proxymq(reference, contract, this);
135                                Class<?>[] array = { contract };
136                                Object newProxy = Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
137                                proxies.put(reference, newProxy);
138                                return (T) newProxy;
139                        }
140                        return (T) proxies.get(reference);
141
142                } catch (Exception e) {
143                        throw new RemoteException(e);
144                }
145        }
146
147        public void bind(String reference, RemoteObject remote) throws RemoteException {
148                try {
149                        remote.startRemoteObject(reference, this);
150                        remoteObjs.put(reference, remote);
151                } catch (Exception e) {
152                        throw new RemoteException(e);
153                }
154        }
155       
156        public void startTriggerEvent(String reference, RemoteObject remote) throws RemoteException {
157                try {
158                        remote.startTriggerEvent(reference, this);
159                        remoteObjs.put(reference, remote);
160                } catch (Exception e) {
161                        throw new RemoteException(e);
162                }
163        }
164
165        public void unbind(String reference) throws RemoteException, IOException {
166                if (remoteObjs.containsKey(reference)) {
167                        RemoteObject remote = remoteObjs.get(reference);
168                        remote.kill();
169                } else {
170                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
171                }
172
173        }
174
175        public void rebind(String name, Remote obj) throws RemoteException {
176
177        }
178
179        /**
180         * This method ensures the client will have only one ResponseListener and
181         * only one EventDispatcher. Both with the same environment.
182         *
183         * @param environment
184         * @throws Exception
185         */
186        private synchronized void initClient(Properties environment) throws Exception {
187                if (responseListener == null) {
188                        responseListener = new ResponseListener(this);
189                        responseListener.start();
190                }
191                if (eventDispatcher == null) {
192                        eventDispatcher = new EventDispatcher(this);
193                        eventDispatcher.start();
194                }
195        }
196
197        /**
198         * This method sends an event with its information
199         *
200         * @param event
201         * @throws IOException
202         * @throws SerializerException
203         */
204        public void trigger(Event event) throws IOException, SerializerException {
205                String UID = event.getTopic();
206                EventWrapper wrapper = new EventWrapper(event);
207                logger.debug("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());
208                channel.exchangeDeclare(UID, "fanout");
209
210                byte[] bytesResponse = serializer.serialize(wrapper);
211                channel.basicPublish(UID, "", null, bytesResponse);
212        }
213
214        /**
215         * This function is used to send a ping message to see if the connection
216         * works
217         *
218         * @param env
219         * @throws Exception
220         */
221        public void tryConnection(Properties env) throws Exception {
222                Channel channel = connection.createChannel();
223                String message = "ping";
224
225                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
226                String queueName = exchange;
227                String routingKey = "routingKey";
228
229                channel.exchangeDeclare(exchange, "direct");
230                channel.queueDeclare(queueName, false, false, false, null);
231                channel.queueBind(queueName, exchange, routingKey);
232
233                channel.basicPublish(exchange, routingKey, null, message.getBytes());
234
235                QueueingConsumer consumer = new QueueingConsumer(channel);
236
237                channel.basicConsume(queueName, true, consumer);
238                Delivery delivery = consumer.nextDelivery(1000);
239
240                channel.exchangeDelete(exchange);
241                channel.queueDelete(queueName);
242
243                channel.close();
244
245                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
246                        throw new IOException("Ping initialitzation has failed");
247                }
248        }
249
250        /**
251         * This method adds a ShutdownListener to the Broker's connection. When this
252         * connection falls, a new connection will be created and this will also
253         * have the listener.
254         */
255        private void addFaultTolerance() {
256                connection.addShutdownListener(new ShutdownListener() {
257                        @Override
258                        public void shutdownCompleted(ShutdownSignalException cause) {
259                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
260                                if (!connectionClosed)
261                                        if (cause.isHardError()) {
262                                                if (connection.isOpen()) {
263                                                        try {
264                                                                connection.close();
265                                                        } catch (IOException e) {
266                                                                e.printStackTrace();
267                                                        }
268                                                }
269                                                try {
270                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
271                                                        channel = connection.createChannel();
272                                                        addFaultTolerance();
273                                                } catch (Exception e) {
274                                                        e.printStackTrace();
275                                                }
276                                        } else {
277                                                Channel channel = (Channel) cause.getReference();
278                                                if (channel.isOpen()) {
279                                                        try {
280                                                                channel.close();
281                                                        } catch (IOException e) {
282                                                                e.printStackTrace();
283                                                        }
284                                                }
285                                        }
286                        }
287                });
288        }
289
290        public Properties getEnvironment() {
291                return environment;
292        }
293
294        public ResponseListener getResponseListener() {
295                return responseListener;
296        }
297
298        public EventDispatcher getEventDispatcher() {
299                return eventDispatcher;
300        }
301
302        public Serializer getSerializer() {
303                return serializer;
304        }
305}
Note: See TracBrowser for help on using the repository browser.