source: trunk/src/main/java/omq/common/broker/Broker.java @ 71

Last change on this file since 71 was 70, checked in by stoda, 11 years ago

MultiProxymq? added

File size: 8.9 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.lang.reflect.Proxy;
5import java.net.URL;
6import java.util.HashMap;
7import java.util.Hashtable;
8import java.util.Map;
9import java.util.Properties;
10
11import omq.Remote;
12import omq.client.listener.ResponseListener;
13import omq.client.proxy.MultiProxymq;
14import omq.client.proxy.Proxymq;
15import omq.common.event.Event;
16import omq.common.event.EventDispatcher;
17import omq.common.event.EventWrapper;
18import omq.common.util.OmqConnectionFactory;
19import omq.common.util.ParameterQueue;
20import omq.common.util.Serializer;
21import omq.exception.InitBrokerException;
22import omq.exception.RemoteException;
23import omq.exception.SerializerException;
24import omq.server.RemoteObject;
25
26import org.apache.log4j.Logger;
27import org.apache.log4j.xml.DOMConfigurator;
28
29import com.rabbitmq.client.Channel;
30import com.rabbitmq.client.Connection;
31import com.rabbitmq.client.QueueingConsumer;
32import com.rabbitmq.client.QueueingConsumer.Delivery;
33import com.rabbitmq.client.ShutdownListener;
34import com.rabbitmq.client.ShutdownSignalException;
35
36public class Broker {
37
38        private static final Logger logger = Logger.getLogger(Broker.class.getName());
39
40        private Connection connection;
41        private Channel channel;
42        private ResponseListener responseListener;
43        private EventDispatcher eventDispatcher;
44        private Serializer serializer;
45        private boolean clientStarted = false;
46        private boolean connectionClosed = false;
47        private Properties environment = null;
48        private Map<String, RemoteObject> remoteObjs;
49        private Map<String, Object> proxies = new Hashtable<String, Object>();
50        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
51
52        public Broker(Properties env) throws Exception {
53                // Load log4j configuration
54                URL log4jResource = Broker.class.getResource("/log4j.xml");
55                DOMConfigurator.configure(log4jResource);
56
57                remoteObjs = new HashMap<String, RemoteObject>();
58                serializer = new Serializer(env);
59                environment = env;
60                connection = OmqConnectionFactory.getNewConnection(env);
61                channel = connection.createChannel();
62                addFaultTolerance();
63                try {
64                        tryConnection(env);
65                } catch (Exception e) {
66                        channel.close();
67                        connection.close();
68                        throw new InitBrokerException("The connection didn't work");
69                }
70        }
71
72        public void stopBroker() throws Exception {
73                logger.warn("Stopping broker");
74                // Stop the client
75                if (clientStarted) {
76                        responseListener.kill();
77                        eventDispatcher.kill();
78                        // TODO proxies = null; ??
79                }
80                // Stop all the remote objects working
81                for (String reference : remoteObjs.keySet()) {
82                        unbind(reference);
83                }
84
85                // Close the connection once all the listeners are died
86                closeConnection();
87
88                clientStarted = false;
89                connectionClosed = false;
90                environment = null;
91                remoteObjs = null;
92                // Serializer.removeSerializers();
93        }
94
95        /**
96         * @return Broker's connection
97         * @throws Exception
98         */
99        public Connection getConnection() throws Exception {
100                return connection;
101        }
102
103        public void closeConnection() throws IOException {
104                logger.warn("Clossing connection");
105                connectionClosed = true;
106                connection.close();
107                connectionClosed = false;
108        }
109
110        /**
111         *
112         * @return Broker's channel
113         * @throws Exception
114         */
115        public Channel getChannel() throws Exception {
116                return channel;
117        }
118
119        /**
120         * Creates a new channel using the Broker's connection
121         *
122         * @return newChannel
123         * @throws IOException
124         */
125        public Channel getNewChannel() throws IOException {
126                return connection.createChannel();
127        }
128
129        @SuppressWarnings("unchecked")
130        public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
131                try {
132
133                        if (!clientStarted) {
134                                initClient(environment);
135                                clientStarted = true;
136                        }
137
138                        if (!proxies.containsKey(reference)) {
139                                Proxymq proxy = new Proxymq(reference, contract, this);
140                                Class<?>[] array = { contract };
141                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
142                                proxies.put(reference, newProxy);
143                                return (T) newProxy;
144                        }
145                        return (T) proxies.get(reference);
146
147                } catch (Exception e) {
148                        throw new RemoteException(e);
149                }
150        }
151
152        @SuppressWarnings("unchecked")
153        public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException {
154                try {
155                        if (!multiProxies.containsKey(reference)) {
156                                MultiProxymq proxy = new MultiProxymq(reference, contract, this);
157                                Class<?>[] array = { contract };
158                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
159                                multiProxies.put(reference, newProxy);
160                                return (T) newProxy;
161                        }
162                        return (T) multiProxies.get(reference);
163
164                } catch (Exception e) {
165                        throw new RemoteException(e);
166                }
167        }
168
169        public void bind(String reference, RemoteObject remote) throws RemoteException {
170                try {
171                        remote.startRemoteObject(reference, this);
172                        remoteObjs.put(reference, remote);
173                } catch (Exception e) {
174                        throw new RemoteException(e);
175                }
176        }
177
178        public void startTriggerEvent(String reference, RemoteObject remote) throws RemoteException {
179                try {
180                        remote.startTriggerEvent(reference, this);
181                        remoteObjs.put(reference, remote);
182                } catch (Exception e) {
183                        throw new RemoteException(e);
184                }
185        }
186
187        public void unbind(String reference) throws RemoteException, IOException {
188                if (remoteObjs.containsKey(reference)) {
189                        RemoteObject remote = remoteObjs.get(reference);
190                        remote.kill();
191                } else {
192                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
193                }
194
195        }
196
197        public void rebind(String name, Remote obj) throws RemoteException {
198
199        }
200
201        /**
202         * This method ensures the client will have only one ResponseListener and
203         * only one EventDispatcher. Both with the same environment.
204         *
205         * @param environment
206         * @throws Exception
207         */
208        private synchronized void initClient(Properties environment) throws Exception {
209                if (responseListener == null) {
210                        responseListener = new ResponseListener(this);
211                        responseListener.start();
212                }
213                if (eventDispatcher == null) {
214                        eventDispatcher = new EventDispatcher(this);
215                        eventDispatcher.start();
216                }
217        }
218
219        /**
220         * This method sends an event with its information
221         *
222         * @param event
223         * @throws IOException
224         * @throws SerializerException
225         */
226        public void trigger(Event event) throws IOException, SerializerException {
227                String UID = event.getTopic();
228                EventWrapper wrapper = new EventWrapper(event);
229                logger.debug("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());
230                channel.exchangeDeclare(UID, "fanout");
231
232                byte[] bytesResponse = serializer.serialize(wrapper);
233                channel.basicPublish(UID, "", null, bytesResponse);
234        }
235
236        /**
237         * This function is used to send a ping message to see if the connection
238         * works
239         *
240         * @param env
241         * @throws Exception
242         */
243        public void tryConnection(Properties env) throws Exception {
244                Channel channel = connection.createChannel();
245                String message = "ping";
246
247                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
248                String queueName = exchange;
249                String routingKey = "routingKey";
250
251                channel.exchangeDeclare(exchange, "direct");
252                channel.queueDeclare(queueName, false, false, false, null);
253                channel.queueBind(queueName, exchange, routingKey);
254
255                channel.basicPublish(exchange, routingKey, null, message.getBytes());
256
257                QueueingConsumer consumer = new QueueingConsumer(channel);
258
259                channel.basicConsume(queueName, true, consumer);
260                Delivery delivery = consumer.nextDelivery(1000);
261
262                channel.exchangeDelete(exchange);
263                channel.queueDelete(queueName);
264
265                channel.close();
266
267                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
268                        throw new IOException("Ping initialitzation has failed");
269                }
270        }
271
272        /**
273         * This method adds a ShutdownListener to the Broker's connection. When this
274         * connection falls, a new connection will be created and this will also
275         * have the listener.
276         */
277        private void addFaultTolerance() {
278                connection.addShutdownListener(new ShutdownListener() {
279                        @Override
280                        public void shutdownCompleted(ShutdownSignalException cause) {
281                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
282                                if (!connectionClosed)
283                                        if (cause.isHardError()) {
284                                                if (connection.isOpen()) {
285                                                        try {
286                                                                connection.close();
287                                                        } catch (IOException e) {
288                                                                e.printStackTrace();
289                                                        }
290                                                }
291                                                try {
292                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
293                                                        channel = connection.createChannel();
294                                                        addFaultTolerance();
295                                                } catch (Exception e) {
296                                                        e.printStackTrace();
297                                                }
298                                        } else {
299                                                Channel channel = (Channel) cause.getReference();
300                                                if (channel.isOpen()) {
301                                                        try {
302                                                                channel.close();
303                                                        } catch (IOException e) {
304                                                                e.printStackTrace();
305                                                        }
306                                                }
307                                        }
308                        }
309                });
310        }
311
312        public Properties getEnvironment() {
313                return environment;
314        }
315
316        public ResponseListener getResponseListener() {
317                return responseListener;
318        }
319
320        public EventDispatcher getEventDispatcher() {
321                return eventDispatcher;
322        }
323
324        public Serializer getSerializer() {
325                return serializer;
326        }
327}
Note: See TracBrowser for help on using the repository browser.