source: trunk/src/main/java/omq/common/broker/Broker.java @ 66

Last change on this file since 66 was 66, checked in by stoda, 11 years ago

startEventTrigger added

File size: 8.2 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.net.URL;
5import java.util.HashMap;
6import java.util.Hashtable;
7import java.util.Map;
8import java.util.Properties;
9
10import omq.Remote;
11import omq.client.listener.ResponseListener;
12import omq.client.proxy.Proxymq;
13import omq.common.event.Event;
14import omq.common.event.EventDispatcher;
15import omq.common.event.EventWrapper;
16import omq.common.util.OmqConnectionFactory;
17import omq.common.util.ParameterQueue;
18import omq.common.util.Serializer;
19import omq.exception.InitBrokerException;
20import omq.exception.RemoteException;
21import omq.exception.SerializerException;
22import omq.server.RemoteObject;
23
24import org.apache.log4j.Logger;
25import org.apache.log4j.xml.DOMConfigurator;
26
27import com.rabbitmq.client.Channel;
28import com.rabbitmq.client.Connection;
29import com.rabbitmq.client.QueueingConsumer;
30import com.rabbitmq.client.QueueingConsumer.Delivery;
31import com.rabbitmq.client.ShutdownListener;
32import com.rabbitmq.client.ShutdownSignalException;
33
34public class Broker {
35
36        private static final Logger logger = Logger.getLogger(Broker.class.getName());
37
38        private Connection connection;
39        private Channel channel;
40        private ResponseListener responseListener;
41        private EventDispatcher eventDispatcher;
42        private Serializer serializer;
43        private boolean clientStarted = false;
44        private boolean connectionClosed = false;
45        private Properties environment = null;
46        private Map<String, RemoteObject> remoteObjs;
47        private Map<String, Object> proxies = new Hashtable<String, Object>();
48
49        public Broker(Properties env) throws Exception {
50                // Load log4j configuration
51                URL log4jResource = Broker.class.getResource("/log4j.xml");
52                DOMConfigurator.configure(log4jResource);
53
54                remoteObjs = new HashMap<String, RemoteObject>();
55                serializer = new Serializer(env);
56                environment = env;
57                connection = OmqConnectionFactory.getNewConnection(env);
58                channel = connection.createChannel();
59                addFaultTolerance();
60                try {
61                        tryConnection(env);
62                } catch (Exception e) {
63                        channel.close();
64                        connection.close();
65                        throw new InitBrokerException("The connection didn't work");
66                }
67        }
68
69        public void stopBroker() throws Exception {
70                logger.warn("Stopping broker");
71                // Stop the client
72                if (clientStarted) {
73                        responseListener.kill();
74                        eventDispatcher.kill();
75                        //TODO proxies = null; ??
76                }
77                // Stop all the remote objects working
78                for (String reference : remoteObjs.keySet()) {
79                        unbind(reference);
80                }
81
82                // Close the connection once all the listeners are died
83                closeConnection();
84
85                clientStarted = false;
86                connectionClosed = false;
87                environment = null;
88                remoteObjs = null;
89                // Serializer.removeSerializers();
90        }
91
92        /**
93         * @return Broker's connection
94         * @throws Exception
95         */
96        public Connection getConnection() throws Exception {
97                return connection;
98        }
99
100        public void closeConnection() throws IOException {
101                logger.warn("Clossing connection");
102                connectionClosed = true;
103                connection.close();
104                connectionClosed = false;
105        }
106
107        /**
108         *
109         * @return Broker's channel
110         * @throws Exception
111         */
112        public Channel getChannel() throws Exception {
113                return channel;
114        }
115
116        /**
117         * Creates a new channel using the Broker's connection
118         *
119         * @return newChannel
120         * @throws IOException
121         */
122        public Channel getNewChannel() throws IOException {
123                return connection.createChannel();
124        }
125
126        @SuppressWarnings("unchecked")
127        public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
128                try {
129
130                        if (!clientStarted) {
131                                initClient(environment);
132                                clientStarted = true;
133                        }
134
135                        if (!proxies.containsKey(reference)) {
136                                Proxymq proxy = new Proxymq(reference, contract, this);
137                                Class<?>[] array = { contract };
138                                Object newProxy = Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
139                                proxies.put(reference, newProxy);
140                                return (T) newProxy;
141                        }
142                        return (T) proxies.get(reference);
143
144                } catch (Exception e) {
145                        throw new RemoteException(e);
146                }
147        }
148
149        public void bind(String reference, RemoteObject remote) throws RemoteException {
150                try {
151                        remote.startRemoteObject(reference, this);
152                        remoteObjs.put(reference, remote);
153                } catch (Exception e) {
154                        throw new RemoteException(e);
155                }
156        }
157       
158        public void startTriggerEvent(String reference, RemoteObject remote) throws RemoteException {
159                try {
160                        remote.startTriggerEvent(reference, this);
161                        remoteObjs.put(reference, remote);
162                } catch (Exception e) {
163                        throw new RemoteException(e);
164                }
165        }
166
167        public void unbind(String reference) throws RemoteException, IOException {
168                if (remoteObjs.containsKey(reference)) {
169                        RemoteObject remote = remoteObjs.get(reference);
170                        remote.kill();
171                } else {
172                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
173                }
174
175        }
176
177        public void rebind(String name, Remote obj) throws RemoteException {
178
179        }
180
181        /**
182         * This method ensures the client will have only one ResponseListener and
183         * only one EventDispatcher. Both with the same environment.
184         *
185         * @param environment
186         * @throws Exception
187         */
188        private synchronized void initClient(Properties environment) throws Exception {
189                if (responseListener == null) {
190                        responseListener = new ResponseListener(this);
191                        responseListener.start();
192                }
193                if (eventDispatcher == null) {
194                        eventDispatcher = new EventDispatcher(this);
195                        eventDispatcher.start();
196                }
197        }
198
199        /**
200         * This method sends an event with its information
201         *
202         * @param event
203         * @throws IOException
204         * @throws SerializerException
205         */
206        public void trigger(Event event) throws IOException, SerializerException {
207                String UID = event.getTopic();
208                EventWrapper wrapper = new EventWrapper(event);
209                logger.debug("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());
210                channel.exchangeDeclare(UID, "fanout");
211
212                byte[] bytesResponse = serializer.serialize(wrapper);
213                channel.basicPublish(UID, "", null, bytesResponse);
214        }
215
216        /**
217         * This function is used to send a ping message to see if the connection
218         * works
219         *
220         * @param env
221         * @throws Exception
222         */
223        public void tryConnection(Properties env) throws Exception {
224                Channel channel = connection.createChannel();
225                String message = "ping";
226
227                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
228                String queueName = exchange;
229                String routingKey = "routingKey";
230
231                channel.exchangeDeclare(exchange, "direct");
232                channel.queueDeclare(queueName, false, false, false, null);
233                channel.queueBind(queueName, exchange, routingKey);
234
235                channel.basicPublish(exchange, routingKey, null, message.getBytes());
236
237                QueueingConsumer consumer = new QueueingConsumer(channel);
238
239                channel.basicConsume(queueName, true, consumer);
240                Delivery delivery = consumer.nextDelivery(1000);
241
242                channel.exchangeDelete(exchange);
243                channel.queueDelete(queueName);
244
245                channel.close();
246
247                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
248                        throw new IOException("Ping initialitzation has failed");
249                }
250        }
251
252        /**
253         * This method adds a ShutdownListener to the Broker's connection. When this
254         * connection falls, a new connection will be created and this will also
255         * have the listener.
256         */
257        private void addFaultTolerance() {
258                connection.addShutdownListener(new ShutdownListener() {
259                        @Override
260                        public void shutdownCompleted(ShutdownSignalException cause) {
261                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
262                                if (!connectionClosed)
263                                        if (cause.isHardError()) {
264                                                if (connection.isOpen()) {
265                                                        try {
266                                                                connection.close();
267                                                        } catch (IOException e) {
268                                                                e.printStackTrace();
269                                                        }
270                                                }
271                                                try {
272                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
273                                                        channel = connection.createChannel();
274                                                        addFaultTolerance();
275                                                } catch (Exception e) {
276                                                        e.printStackTrace();
277                                                }
278                                        } else {
279                                                Channel channel = (Channel) cause.getReference();
280                                                if (channel.isOpen()) {
281                                                        try {
282                                                                channel.close();
283                                                        } catch (IOException e) {
284                                                                e.printStackTrace();
285                                                        }
286                                                }
287                                        }
288                        }
289                });
290        }
291
292        public Properties getEnvironment() {
293                return environment;
294        }
295
296        public ResponseListener getResponseListener() {
297                return responseListener;
298        }
299
300        public EventDispatcher getEventDispatcher() {
301                return eventDispatcher;
302        }
303
304        public Serializer getSerializer() {
305                return serializer;
306        }
307}
Note: See TracBrowser for help on using the repository browser.