source: trunk/src/main/java/omq/common/broker/Broker.java @ 53

Last change on this file since 53 was 53, checked in by stoda, 11 years ago

Non static broker
TODO: change all test to see whether the new broker configuration works

File size: 7.8 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.net.URL;
5import java.util.HashMap;
6import java.util.Map;
7import java.util.Properties;
8
9import omq.Remote;
10import omq.client.listener.ResponseListener;
11import omq.client.proxy.Proxymq;
12import omq.common.event.Event;
13import omq.common.event.EventDispatcher;
14import omq.common.event.EventWrapper;
15import omq.common.util.OmqConnectionFactory;
16import omq.common.util.ParameterQueue;
17import omq.common.util.Serializer;
18import omq.exception.InitBrokerException;
19import omq.exception.RemoteException;
20import omq.exception.SerializerException;
21import omq.server.RemoteObject;
22
23import org.apache.log4j.Logger;
24import org.apache.log4j.xml.DOMConfigurator;
25
26import com.rabbitmq.client.Channel;
27import com.rabbitmq.client.Connection;
28import com.rabbitmq.client.QueueingConsumer;
29import com.rabbitmq.client.QueueingConsumer.Delivery;
30import com.rabbitmq.client.ShutdownListener;
31import com.rabbitmq.client.ShutdownSignalException;
32
33public class Broker {
34
35        private static final Logger logger = Logger.getLogger(Broker.class.getName());
36
37        private Connection connection;
38        private Channel channel;
39        private ResponseListener responseListener;
40        private EventDispatcher eventDispatcher;
41        private Serializer serializer;
42        private boolean clientStarted = false;
43        private boolean connectionClosed = false;
44        private Properties environment = null;
45        private Map<String, RemoteObject> remoteObjs;
46
47        public Broker(Properties env) throws Exception {
48                // Load log4j configuration
49                URL log4jResource = Broker.class.getResource("/log4j.xml");
50                DOMConfigurator.configure(log4jResource);
51
52                remoteObjs = new HashMap<String, RemoteObject>();
53                serializer = new Serializer(env);
54                environment = env;
55                connection = OmqConnectionFactory.getNewConnection(env);
56                channel = connection.createChannel();
57                addFaultTolerance();
58                try {
59                        tryConnection(env);
60                } catch (Exception e) {
61                        channel.close();
62                        connection.close();
63                        throw new InitBrokerException("The connection didn't work");
64                }
65        }
66
67        public void stopBroker() throws Exception {
68                logger.warn("Stopping broker");
69                // Stop the client
70                if (clientStarted) {
71                        responseListener.kill();
72                        eventDispatcher.kill();
73                        Proxymq.stopProxy();
74                }
75                // Stop all the remote objects working
76                for (String reference : remoteObjs.keySet()) {
77                        unbind(reference);
78                }
79
80                // Close the connection once all the listeners are died
81                closeConnection();
82
83                clientStarted = false;
84                connectionClosed = false;
85                environment = null;
86                remoteObjs = null;
87                // Serializer.removeSerializers();
88        }
89
90        /**
91         * @return Broker's connection
92         * @throws Exception
93         */
94        public Connection getConnection() throws Exception {
95                return connection;
96        }
97
98        public void closeConnection() throws IOException {
99                logger.warn("Clossing connection");
100                connectionClosed = true;
101                connection.close();
102                connectionClosed = false;
103        }
104
105        /**
106         *
107         * @return Broker's channel
108         * @throws Exception
109         */
110        public Channel getChannel() throws Exception {
111                return channel;
112        }
113
114        /**
115         * Creates a new channel using the Broker's connection
116         *
117         * @return newChannel
118         * @throws IOException
119         */
120        public Channel getNewChannel() throws IOException {
121                return connection.createChannel();
122        }
123
124        @SuppressWarnings("unchecked")
125        public <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
126                try {
127
128                        if (!clientStarted) {
129                                initClient(environment);
130                                clientStarted = true;
131                        }
132
133                        if (!Proxymq.containsProxy(reference)) {
134                                Proxymq proxy = new Proxymq(reference, contract, this);
135                                Class<?>[] array = { contract };
136                                return (T) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
137                        }
138                        return (T) Proxymq.getInstance(reference);
139
140                } catch (Exception e) {
141                        throw new RemoteException(e);
142                }
143        }
144
145        public void bind(String reference, RemoteObject remote) throws RemoteException {
146                try {
147                        remote.startRemoteObject(reference, this);
148                        remoteObjs.put(reference, remote);
149                } catch (Exception e) {
150                        throw new RemoteException(e);
151                }
152        }
153
154        public void unbind(String reference) throws RemoteException, IOException {
155                if (remoteObjs.containsKey(reference)) {
156                        RemoteObject remote = remoteObjs.get(reference);
157                        remote.kill();
158                } else {
159                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
160                }
161
162        }
163
164        public void rebind(String name, Remote obj) throws RemoteException {
165
166        }
167
168        /**
169         * This method ensures the client will have only one ResponseListener and
170         * only one EventDispatcher. Both with the same environment.
171         *
172         * @param environment
173         * @throws Exception
174         */
175        private synchronized void initClient(Properties environment) throws Exception {
176                if (responseListener == null) {
177                        responseListener = new ResponseListener(this);
178                }
179                if (eventDispatcher == null) {
180                        eventDispatcher = new EventDispatcher(this);
181                }
182        }
183
184        /**
185         * This method sends an event with its information
186         *
187         * @param event
188         * @throws IOException
189         * @throws SerializerException
190         */
191        public void trigger(Event event) throws IOException, SerializerException {
192                String UID = event.getTopic();
193                EventWrapper wrapper = new EventWrapper(event);
194                logger.debug("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());
195                channel.exchangeDeclare(UID, "fanout");
196
197                byte[] bytesResponse = serializer.serialize(wrapper);
198                channel.basicPublish(UID, "", null, bytesResponse);
199
200                // Log.saveLog("Server-Serialize", bytesResponse);
201        }
202
203        /**
204         * This function is used to send a ping message to see if the connection
205         * works
206         *
207         * @param env
208         * @throws Exception
209         */
210        public void tryConnection(Properties env) throws Exception {
211                Channel channel = connection.createChannel();
212                String message = "ping";
213
214                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
215                String queueName = exchange;
216                String routingKey = "routingKey";
217
218                channel.exchangeDeclare(exchange, "direct");
219                channel.queueDeclare(queueName, false, false, false, null);
220                channel.queueBind(queueName, exchange, routingKey);
221
222                channel.basicPublish(exchange, routingKey, null, message.getBytes());
223
224                QueueingConsumer consumer = new QueueingConsumer(channel);
225
226                channel.basicConsume(queueName, true, consumer);
227                Delivery delivery = consumer.nextDelivery(1000);
228
229                channel.exchangeDelete(exchange);
230                channel.queueDelete(queueName);
231
232                channel.close();
233
234                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
235                        throw new IOException("Ping initialitzation has failed");
236                }
237        }
238
239        /**
240         * This method adds a ShutdownListener to the Broker's connection. When this
241         * connection falls, a new connection will be created and this will also
242         * have the listener.
243         */
244        private void addFaultTolerance() {
245                connection.addShutdownListener(new ShutdownListener() {
246                        @Override
247                        public void shutdownCompleted(ShutdownSignalException cause) {
248                                logger.warn("Shutdown message received. Cause: " + cause.getMessage());
249                                if (!connectionClosed)
250                                        if (cause.isHardError()) {
251                                                if (connection.isOpen()) {
252                                                        try {
253                                                                connection.close();
254                                                        } catch (IOException e) {
255                                                                e.printStackTrace();
256                                                        }
257                                                }
258                                                try {
259                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
260                                                        channel = connection.createChannel();
261                                                        addFaultTolerance();
262                                                } catch (Exception e) {
263                                                        e.printStackTrace();
264                                                }
265                                        } else {
266                                                Channel channel = (Channel) cause.getReference();
267                                                if (channel.isOpen()) {
268                                                        try {
269                                                                channel.close();
270                                                        } catch (IOException e) {
271                                                                e.printStackTrace();
272                                                        }
273                                                }
274                                        }
275                        }
276                });
277        }
278
279        public Properties getEnvironment() {
280                return environment;
281        }
282
283        public ResponseListener getResponseListener() {
284                return responseListener;
285        }
286
287        public EventDispatcher getEventDispatcher() {
288                return eventDispatcher;
289        }
290
291        public Serializer getSerializer() {
292                return serializer;
293        }
294}
Note: See TracBrowser for help on using the repository browser.