source: trunk/src/main/java/omq/common/broker/Broker.java @ 50

Last change on this file since 50 was 50, checked in by stoda, 11 years ago
File size: 8.0 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.net.URL;
5import java.util.HashMap;
6import java.util.Map;
7import java.util.Properties;
8
9import omq.Remote;
10import omq.client.listener.ResponseListener;
11import omq.client.proxy.Proxymq;
12import omq.common.event.Event;
13import omq.common.event.EventDispatcher;
14import omq.common.event.EventWrapper;
15import omq.common.util.OmqConnectionFactory;
16import omq.common.util.ParameterQueue;
17import omq.common.util.Serializer;
18import omq.exception.InitBrokerException;
19import omq.exception.RemoteException;
20import omq.exception.SerializerException;
21import omq.server.RemoteObject;
22
23import org.apache.log4j.Logger;
24import org.apache.log4j.xml.DOMConfigurator;
25
26import com.rabbitmq.client.Channel;
27import com.rabbitmq.client.Connection;
28import com.rabbitmq.client.QueueingConsumer;
29import com.rabbitmq.client.QueueingConsumer.Delivery;
30import com.rabbitmq.client.ShutdownListener;
31import com.rabbitmq.client.ShutdownSignalException;
32
33public class Broker {
34
35        private static final Logger logger = Logger.getLogger(Broker.class.getName());
36
37        private static Connection connection;
38        private static Channel channel;
39        private static boolean clientStarted = false;
40        private static boolean connectionClosed = false;
41        private static Properties environment = null;
42        // TODO ask Pedro if it can be only one object in the map (an object can
43        // have multiple threads in the same broker -see environment-)
44        private static Map<String, RemoteObject> remoteObjs;
45
46        /**
47         * Initializes a new Broker with the environment called by reference
48         *
49         * @param env
50         * @throws Exception
51         */
52        public static synchronized void initBroker(Properties env) throws Exception {
53                if (environment == null) {
54
55                        // Load log4j configuration
56                        URL log4jResource = Broker.class.getResource("/log4j.xml");
57                        DOMConfigurator.configure(log4jResource);
58
59                        remoteObjs = new HashMap<String, RemoteObject>();
60                        environment = env;
61                        connection = OmqConnectionFactory.getNewConnection(env);
62                        channel = connection.createChannel();
63                        addFaultTolerance();
64                        try {
65                                tryConnection(env);
66                        } catch (Exception e) {
67                                channel.close();
68                                connection.close();
69                                throw new InitBrokerException("The connection didn't work");
70                        }
71                } else {
72                        logger.error("Broker is already started");
73                        throw new InitBrokerException("Broker is already started");
74                }
75        }
76
77        public static void stopBroker() throws Exception {
78                logger.warn("Stopping broker");
79                // Stop the client
80                if (clientStarted) {
81                        ResponseListener.stopResponseListner();
82                        EventDispatcher.stopEventDispatcher();
83                        Proxymq.stopProxy();
84                }
85                // Stop all the remote objects working
86                for (String reference : remoteObjs.keySet()) {
87                        unbind(reference);
88                }
89
90                // Close the connection once all the listeners are died
91                closeConnection();
92
93                clientStarted = false;
94                connectionClosed = false;
95                environment = null;
96                remoteObjs = null;
97                Serializer.removeSerializers();
98        }
99
100        /**
101         * @return Broker's connection
102         * @throws Exception
103         */
104        public static Connection getConnection() throws Exception {
105                return connection;
106        }
107
108        public static void closeConnection() throws IOException {
109                logger.warn("Clossing connection");
110                connectionClosed = true;
111                connection.close();
112                connectionClosed = false;
113        }
114
115        /**
116         *
117         * @return Broker's channel
118         * @throws Exception
119         */
120        public static Channel getChannel() throws Exception {
121                return channel;
122        }
123
124        /**
125         * Creates a new channel using the Broker's connection
126         *
127         * @return newChannel
128         * @throws IOException
129         */
130        public static Channel getNewChannel() throws IOException {
131                return connection.createChannel();
132        }
133
134        @SuppressWarnings("unchecked")
135        public static <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
136                try {
137
138                        if (!clientStarted) {
139                                initClient(environment);
140                                clientStarted = true;
141                        }
142
143                        if (!Proxymq.containsProxy(reference)) {
144                                Proxymq proxy = new Proxymq(reference, contract, environment);
145                                Class<?>[] array = { contract };
146                                return (T) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
147                        }
148                        return (T) Proxymq.getInstance(reference);
149
150                } catch (Exception e) {
151                        throw new RemoteException(e);
152                }
153        }
154
155        public static void bind(String reference, RemoteObject remote) throws RemoteException {
156                try {
157                        remote.startRemoteObject(reference, environment);
158                        remoteObjs.put(reference, remote);
159                } catch (Exception e) {
160                        throw new RemoteException(e);
161                }
162        }
163
164        public static void unbind(String reference) throws RemoteException, IOException {
165                if (remoteObjs.containsKey(reference)) {
166                        RemoteObject remote = remoteObjs.get(reference);
167                        remote.kill();
168                } else {
169                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
170                }
171
172        }
173
174        public void rebind(String name, Remote obj) throws RemoteException {
175
176        }
177
178        /**
179         * This method ensures the client will have only one ResponseListener and
180         * only one EventDispatcher. Both with the same environment.
181         *
182         * @param environment
183         * @throws Exception
184         */
185        private static synchronized void initClient(Properties environment) throws Exception {
186                if (ResponseListener.isVoid()) {
187                        ResponseListener.init(environment);
188                }
189                if (EventDispatcher.isVoid()) {
190                        EventDispatcher.init(environment);
191                }
192        }
193
194        /**
195         * This method sends an event with its information
196         *
197         * @param event
198         * @throws IOException
199         * @throws SerializerException
200         */
201        public static void trigger(Event event) throws IOException, SerializerException {
202                String UID = event.getTopic();
203                EventWrapper wrapper = new EventWrapper(event);
204                logger.debug("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());
205                channel.exchangeDeclare(UID, "fanout");
206
207                byte[] bytesResponse = Serializer.serialize(wrapper);
208                channel.basicPublish(UID, "", null, bytesResponse);
209
210                // Log.saveLog("Server-Serialize", bytesResponse);
211        }
212
213        /**
214         * This function is used to send a ping message to see if the connection
215         * works
216         *
217         * @param env
218         * @throws Exception
219         */
220        public static void tryConnection(Properties env) throws Exception {
221                Channel channel = connection.createChannel();
222                String message = "ping";
223
224                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
225                String queueName = exchange;
226                String routingKey = "routingKey";
227
228                channel.exchangeDeclare(exchange, "direct");
229                channel.queueDeclare(queueName, false, false, false, null);
230                channel.queueBind(queueName, exchange, routingKey);
231
232                channel.basicPublish(exchange, routingKey, null, message.getBytes());
233
234                QueueingConsumer consumer = new QueueingConsumer(channel);
235
236                channel.basicConsume(queueName, true, consumer);
237                Delivery delivery = consumer.nextDelivery(1000);
238
239                channel.exchangeDelete(exchange);
240                channel.queueDelete(queueName);
241
242                channel.close();
243
244                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
245                        throw new IOException("Ping initialitzation has failed");
246                }
247        }
248
249        /**
250         * This method adds a ShutdownListener to the Broker's connection. When this
251         * connection falls, a new connection will be created and this will also
252         * have the listener.
253         */
254        private static void addFaultTolerance() {
255                connection.addShutdownListener(new ShutdownListener() {
256                        @Override
257                        public void shutdownCompleted(ShutdownSignalException cause) {
258                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
259                                if (!connectionClosed)
260                                        if (cause.isHardError()) {
261                                                if (connection.isOpen()) {
262                                                        try {
263                                                                connection.close();
264                                                        } catch (IOException e) {
265                                                                e.printStackTrace();
266                                                        }
267                                                }
268                                                try {
269                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
270                                                        channel = connection.createChannel();
271                                                        addFaultTolerance();
272                                                } catch (Exception e) {
273                                                        e.printStackTrace();
274                                                }
275                                        } else {
276                                                Channel channel = (Channel) cause.getReference();
277                                                if (channel.isOpen()) {
278                                                        try {
279                                                                channel.close();
280                                                        } catch (IOException e) {
281                                                                e.printStackTrace();
282                                                        }
283                                                }
284                                        }
285                        }
286                });
287        }
288
289        public static Properties getEnvironment() {
290                return environment;
291        }
292
293}
Note: See TracBrowser for help on using the repository browser.