source: trunk/src/main/java/omq/common/broker/Broker.java @ 47

Last change on this file since 47 was 47, checked in by stoda, 11 years ago

Refactoring Environment class - deleted.
StopBroker? problems solved (?)
Server can receive send and receive messages in different formats.
Some tests modified

TODO: finish all the tests, add log4j

File size: 7.5 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.util.HashMap;
5import java.util.Map;
6import java.util.Properties;
7
8import omq.Remote;
9import omq.client.listener.ResponseListener;
10import omq.client.proxy.Proxymq;
11import omq.common.event.Event;
12import omq.common.event.EventDispatcher;
13import omq.common.event.EventWrapper;
14import omq.common.util.OmqConnectionFactory;
15import omq.common.util.ParameterQueue;
16import omq.common.util.Serializer;
17import omq.exception.InitBrokerException;
18import omq.exception.RemoteException;
19import omq.exception.SerializerException;
20import omq.server.RemoteObject;
21
22import com.rabbitmq.client.Channel;
23import com.rabbitmq.client.Connection;
24import com.rabbitmq.client.QueueingConsumer;
25import com.rabbitmq.client.QueueingConsumer.Delivery;
26import com.rabbitmq.client.ShutdownListener;
27import com.rabbitmq.client.ShutdownSignalException;
28
29public class Broker {
30        private static Connection connection;
31        private static Channel channel;
32        private static boolean clientStarted = false;
33        private static boolean connectionClosed = false;
34        private static Properties environment = null;
35        // TODO ask Pedro if it can be only one object in the map (an object can
36        // have multiple threads in the same broker -see environment-)
37        private static Map<String, RemoteObject> remoteObjs;
38
39        /**
40         * Initializes a new Broker with the environment called by reference
41         *
42         * @param env
43         * @throws Exception
44         */
45        public static synchronized void initBroker(Properties env) throws Exception {
46                if (environment == null) {
47                        remoteObjs = new HashMap<String, RemoteObject>();
48                        environment = env;
49                        connection = OmqConnectionFactory.getNewConnection(env);
50                        channel = connection.createChannel();
51                        addFaultTolerance();
52                        try {
53                                tryConnection(env);
54                        } catch (Exception e) {
55                                channel.close();
56                                connection.close();
57                                throw new InitBrokerException("The connection didn't work");
58                        }
59                } else {
60                        throw new InitBrokerException("Broker already started");
61                }
62        }
63
64        public static void stopBroker() throws Exception {
65                // Stop the client
66                if (clientStarted) {
67                        ResponseListener.stopResponseListner();
68                        EventDispatcher.stopEventDispatcher();
69                        Proxymq.stopProxy();
70                }
71                // Stop all the remote objects working
72                for (String reference : remoteObjs.keySet()) {
73                        unbind(reference);
74                }
75
76                // Close the connection once all the listeners are died
77                closeConnection();
78
79                clientStarted = false;
80                connectionClosed = false;
81                environment = null;
82                remoteObjs = null;
83                Serializer.removeSerializers();
84        }
85
86        /**
87         * @return Broker's connection
88         * @throws Exception
89         */
90        public static Connection getConnection() throws Exception {
91                return connection;
92        }
93
94        public static void closeConnection() throws IOException {
95                connectionClosed = true;
96                connection.close();
97                connectionClosed = false;
98        }
99
100        /**
101         *
102         * @return Broker's channel
103         * @throws Exception
104         */
105        public static Channel getChannel() throws Exception {
106                return channel;
107        }
108
109        /**
110         * Creates a new channel using the Broker's connection
111         *
112         * @return newChannel
113         * @throws IOException
114         */
115        public static Channel getNewChannel() throws IOException {
116                return connection.createChannel();
117        }
118
119        @SuppressWarnings("unchecked")
120        public static <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
121                try {
122
123                        if (!clientStarted) {
124                                initClient(environment);
125                                clientStarted = true;
126                        }
127
128                        if (!Proxymq.containsProxy(reference)) {
129                                Proxymq proxy = new Proxymq(reference, contract, environment);
130                                Class<?>[] array = { contract };
131                                return (T) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
132                        }
133                        return (T) Proxymq.getInstance(reference);
134
135                } catch (Exception e) {
136                        throw new RemoteException(e);
137                }
138        }
139
140        public static void bind(String reference, RemoteObject remote) throws RemoteException {
141                try {
142                        remote.startRemoteObject(reference, environment);
143                        remoteObjs.put(reference, remote);
144                } catch (Exception e) {
145                        throw new RemoteException(e);
146                }
147        }
148
149        public static void unbind(String reference) throws RemoteException, IOException {
150                if (remoteObjs.containsKey(reference)) {
151                        RemoteObject remote = remoteObjs.get(reference);
152                        remote.kill();
153                } else {
154                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
155                }
156
157        }
158
159        public void rebind(String name, Remote obj) throws RemoteException {
160
161        }
162
163        /**
164         * This method ensures the client will have only one ResponseListener and
165         * only one EventDispatcher. Both with the same environment.
166         *
167         * @param environment
168         * @throws Exception
169         */
170        private static synchronized void initClient(Properties environment) throws Exception {
171                if (ResponseListener.isVoid()) {
172                        ResponseListener.init(environment);
173                }
174                if (EventDispatcher.isVoid()) {
175                        EventDispatcher.init(environment);
176                }
177        }
178
179        /**
180         * This method sends an event with its information
181         *
182         * @param event
183         * @throws IOException
184         * @throws SerializerException
185         */
186        public static void trigger(Event event) throws IOException, SerializerException {
187                String UID = event.getTopic();
188                EventWrapper wrapper = new EventWrapper(event);
189                System.out.println("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());
190                channel.exchangeDeclare(UID, "fanout");
191
192                byte[] bytesResponse = Serializer.serialize(wrapper);
193                channel.basicPublish(UID, "", null, bytesResponse);
194
195                // Log.saveLog("Server-Serialize", bytesResponse);
196        }
197
198        /**
199         * This function is used to send a ping message to see if the connection
200         * works
201         *
202         * @param env
203         * @throws Exception
204         */
205        public static void tryConnection(Properties env) throws Exception {
206                Channel channel = connection.createChannel();
207                String message = "ping";
208
209                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
210                String queueName = exchange;
211                String routingKey = "routingKey";
212
213                channel.exchangeDeclare(exchange, "direct");
214                channel.queueDeclare(queueName, false, false, false, null);
215                channel.queueBind(queueName, exchange, routingKey);
216
217                channel.basicPublish(exchange, routingKey, null, message.getBytes());
218
219                QueueingConsumer consumer = new QueueingConsumer(channel);
220
221                channel.basicConsume(queueName, true, consumer);
222                Delivery delivery = consumer.nextDelivery(1000);
223
224                channel.exchangeDelete(exchange);
225                channel.queueDelete(queueName);
226
227                channel.close();
228
229                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
230                        throw new IOException("Ping initialitzation has failed");
231                }
232        }
233
234        /**
235         * This method adds a ShutdownListener to the Broker's connection. When this
236         * connection falls, a new connection will be created and this will also
237         * have the listener.
238         */
239        private static void addFaultTolerance() {
240                connection.addShutdownListener(new ShutdownListener() {
241                        @Override
242                        public void shutdownCompleted(ShutdownSignalException cause) {
243                                if (!connectionClosed)
244                                        if (cause.isHardError()) {
245                                                if (connection.isOpen()) {
246                                                        try {
247                                                                connection.close();
248                                                        } catch (IOException e) {
249                                                                e.printStackTrace();
250                                                        }
251                                                }
252                                                try {
253                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
254                                                        channel = connection.createChannel();
255                                                        addFaultTolerance();
256                                                } catch (Exception e) {
257                                                        e.printStackTrace();
258                                                }
259                                        } else {
260                                                Channel channel = (Channel) cause.getReference();
261                                                if (channel.isOpen()) {
262                                                        try {
263                                                                channel.close();
264                                                        } catch (IOException e) {
265                                                                e.printStackTrace();
266                                                        }
267                                                }
268                                        }
269                        }
270                });
271        }
272
273        public static Properties getEnvironment() {
274                return environment;
275        }
276
277}
Note: See TracBrowser for help on using the repository browser.