source: trunk/src/main/java/omq/common/broker/Broker.java @ 62

Last change on this file since 62 was 59, checked in by stoda, 11 years ago

private static proxies in Proxymq deleted -> non static proxies moved to Broker

File size: 7.9 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.net.URL;
5import java.util.HashMap;
6import java.util.Hashtable;
7import java.util.Map;
8import java.util.Properties;
9
10import omq.Remote;
11import omq.client.listener.ResponseListener;
12import omq.client.proxy.Proxymq;
13import omq.common.event.Event;
14import omq.common.event.EventDispatcher;
15import omq.common.event.EventWrapper;
16import omq.common.util.OmqConnectionFactory;
17import omq.common.util.ParameterQueue;
18import omq.common.util.Serializer;
19import omq.exception.InitBrokerException;
20import omq.exception.RemoteException;
21import omq.exception.SerializerException;
22import omq.server.RemoteObject;
23
24import org.apache.log4j.Logger;
25import org.apache.log4j.xml.DOMConfigurator;
26
27import com.rabbitmq.client.Channel;
28import com.rabbitmq.client.Connection;
29import com.rabbitmq.client.QueueingConsumer;
30import com.rabbitmq.client.QueueingConsumer.Delivery;
31import com.rabbitmq.client.ShutdownListener;
32import com.rabbitmq.client.ShutdownSignalException;
33
34public class Broker {
35
36        private static final Logger logger = Logger.getLogger(Broker.class.getName());
37
38        private Connection connection;
39        private Channel channel;
40        private ResponseListener responseListener;
41        private EventDispatcher eventDispatcher;
42        private Serializer serializer;
43        private boolean clientStarted = false;
44        private boolean connectionClosed = false;
45        private Properties environment = null;
46        private Map<String, RemoteObject> remoteObjs;
47        private Map<String, Object> proxies = new Hashtable<String, Object>();
48
49        public Broker(Properties env) throws Exception {
50                // Load log4j configuration
51                URL log4jResource = Broker.class.getResource("/log4j.xml");
52                DOMConfigurator.configure(log4jResource);
53
54                remoteObjs = new HashMap<String, RemoteObject>();
55                serializer = new Serializer(env);
56                environment = env;
57                connection = OmqConnectionFactory.getNewConnection(env);
58                channel = connection.createChannel();
59                addFaultTolerance();
60                try {
61                        tryConnection(env);
62                } catch (Exception e) {
63                        channel.close();
64                        connection.close();
65                        throw new InitBrokerException("The connection didn't work");
66                }
67        }
68
69        public void stopBroker() throws Exception {
70                logger.warn("Stopping broker");
71                // Stop the client
72                if (clientStarted) {
73                        responseListener.kill();
74                        eventDispatcher.kill();
75                        //TODO proxies = null; ??
76                }
77                // Stop all the remote objects working
78                for (String reference : remoteObjs.keySet()) {
79                        unbind(reference);
80                }
81
82                // Close the connection once all the listeners are died
83                closeConnection();
84
85                clientStarted = false;
86                connectionClosed = false;
87                environment = null;
88                remoteObjs = null;
89                // Serializer.removeSerializers();
90        }
91
92        /**
93         * @return Broker's connection
94         * @throws Exception
95         */
96        public Connection getConnection() throws Exception {
97                return connection;
98        }
99
100        public void closeConnection() throws IOException {
101                logger.warn("Clossing connection");
102                connectionClosed = true;
103                connection.close();
104                connectionClosed = false;
105        }
106
107        /**
108         *
109         * @return Broker's channel
110         * @throws Exception
111         */
112        public Channel getChannel() throws Exception {
113                return channel;
114        }
115
116        /**
117         * Creates a new channel using the Broker's connection
118         *
119         * @return newChannel
120         * @throws IOException
121         */
122        public Channel getNewChannel() throws IOException {
123                return connection.createChannel();
124        }
125
126        @SuppressWarnings("unchecked")
127        public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
128                try {
129
130                        if (!clientStarted) {
131                                initClient(environment);
132                                clientStarted = true;
133                        }
134
135                        if (!proxies.containsKey(reference)) {
136                                Proxymq proxy = new Proxymq(reference, contract, this);
137                                Class<?>[] array = { contract };
138                                Object newProxy = Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
139                                proxies.put(reference, newProxy);
140                                return (T) newProxy;
141                        }
142                        return (T) proxies.get(reference);
143
144                } catch (Exception e) {
145                        throw new RemoteException(e);
146                }
147        }
148
149        public void bind(String reference, RemoteObject remote) throws RemoteException {
150                try {
151                        remote.startRemoteObject(reference, this);
152                        remoteObjs.put(reference, remote);
153                } catch (Exception e) {
154                        throw new RemoteException(e);
155                }
156        }
157
158        public void unbind(String reference) throws RemoteException, IOException {
159                if (remoteObjs.containsKey(reference)) {
160                        RemoteObject remote = remoteObjs.get(reference);
161                        remote.kill();
162                } else {
163                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
164                }
165
166        }
167
168        public void rebind(String name, Remote obj) throws RemoteException {
169
170        }
171
172        /**
173         * This method ensures the client will have only one ResponseListener and
174         * only one EventDispatcher. Both with the same environment.
175         *
176         * @param environment
177         * @throws Exception
178         */
179        private synchronized void initClient(Properties environment) throws Exception {
180                if (responseListener == null) {
181                        responseListener = new ResponseListener(this);
182                        responseListener.start();
183                }
184                if (eventDispatcher == null) {
185                        eventDispatcher = new EventDispatcher(this);
186                        eventDispatcher.start();
187                }
188        }
189
190        /**
191         * This method sends an event with its information
192         *
193         * @param event
194         * @throws IOException
195         * @throws SerializerException
196         */
197        public void trigger(Event event) throws IOException, SerializerException {
198                String UID = event.getTopic();
199                EventWrapper wrapper = new EventWrapper(event);
200                logger.debug("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());
201                channel.exchangeDeclare(UID, "fanout");
202
203                byte[] bytesResponse = serializer.serialize(wrapper);
204                channel.basicPublish(UID, "", null, bytesResponse);
205        }
206
207        /**
208         * This function is used to send a ping message to see if the connection
209         * works
210         *
211         * @param env
212         * @throws Exception
213         */
214        public void tryConnection(Properties env) throws Exception {
215                Channel channel = connection.createChannel();
216                String message = "ping";
217
218                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
219                String queueName = exchange;
220                String routingKey = "routingKey";
221
222                channel.exchangeDeclare(exchange, "direct");
223                channel.queueDeclare(queueName, false, false, false, null);
224                channel.queueBind(queueName, exchange, routingKey);
225
226                channel.basicPublish(exchange, routingKey, null, message.getBytes());
227
228                QueueingConsumer consumer = new QueueingConsumer(channel);
229
230                channel.basicConsume(queueName, true, consumer);
231                Delivery delivery = consumer.nextDelivery(1000);
232
233                channel.exchangeDelete(exchange);
234                channel.queueDelete(queueName);
235
236                channel.close();
237
238                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
239                        throw new IOException("Ping initialitzation has failed");
240                }
241        }
242
243        /**
244         * This method adds a ShutdownListener to the Broker's connection. When this
245         * connection falls, a new connection will be created and this will also
246         * have the listener.
247         */
248        private void addFaultTolerance() {
249                connection.addShutdownListener(new ShutdownListener() {
250                        @Override
251                        public void shutdownCompleted(ShutdownSignalException cause) {
252                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
253                                if (!connectionClosed)
254                                        if (cause.isHardError()) {
255                                                if (connection.isOpen()) {
256                                                        try {
257                                                                connection.close();
258                                                        } catch (IOException e) {
259                                                                e.printStackTrace();
260                                                        }
261                                                }
262                                                try {
263                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
264                                                        channel = connection.createChannel();
265                                                        addFaultTolerance();
266                                                } catch (Exception e) {
267                                                        e.printStackTrace();
268                                                }
269                                        } else {
270                                                Channel channel = (Channel) cause.getReference();
271                                                if (channel.isOpen()) {
272                                                        try {
273                                                                channel.close();
274                                                        } catch (IOException e) {
275                                                                e.printStackTrace();
276                                                        }
277                                                }
278                                        }
279                        }
280                });
281        }
282
283        public Properties getEnvironment() {
284                return environment;
285        }
286
287        public ResponseListener getResponseListener() {
288                return responseListener;
289        }
290
291        public EventDispatcher getEventDispatcher() {
292                return eventDispatcher;
293        }
294
295        public Serializer getSerializer() {
296                return serializer;
297        }
298}
Note: See TracBrowser for help on using the repository browser.