source: trunk/src/main/java/omq/common/broker/Broker.java @ 44

Last change on this file since 44 was 44, checked in by stoda, 11 years ago

Objectmq converted to maven project

File size: 7.4 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.util.HashMap;
5import java.util.Map;
6import java.util.Properties;
7
8import omq.Remote;
9import omq.client.listener.ResponseListener;
10import omq.client.proxy.Proxymq;
11import omq.common.event.Event;
12import omq.common.event.EventDispatcher;
13import omq.common.event.EventWrapper;
14import omq.common.util.Environment;
15import omq.common.util.OmqConnectionFactory;
16import omq.common.util.ParameterQueue;
17import omq.common.util.Serializer;
18import omq.exception.InitBrokerException;
19import omq.exception.RemoteException;
20import omq.exception.SerializerException;
21import omq.server.RemoteObject;
22
23import com.rabbitmq.client.Channel;
24import com.rabbitmq.client.Connection;
25import com.rabbitmq.client.QueueingConsumer;
26import com.rabbitmq.client.QueueingConsumer.Delivery;
27import com.rabbitmq.client.ShutdownListener;
28import com.rabbitmq.client.ShutdownSignalException;
29
30public class Broker {
31        private static Connection connection;
32        private static Channel channel;
33        private static boolean clientStarted = false;
34        private static boolean connectionClosed = false;
35        // TODO ask Pedro if it can be only one object in the map (an object can
36        // have multiple threads in the same broker -see environment-)
37        private static Map<String, RemoteObject> remoteObjs;
38
39        /**
40         * Initializes a new Broker with the environment called by reference
41         *
42         * @param env
43         * @throws Exception
44         */
45        public static synchronized void initBroker(Properties env) throws Exception {
46                if (Environment.isVoid()) {
47                        remoteObjs = new HashMap<String, RemoteObject>();
48                        Environment.setEnvironment(env);
49                        connection = OmqConnectionFactory.getNewConnection(env);
50                        channel = connection.createChannel();
51                        addFaultTolerance();
52                        try {
53                                tryConnection(env);
54                        } catch (Exception e) {
55                                channel.close();
56                                connection.close();
57                                throw new InitBrokerException("The connection didn't work");
58                        }
59                } else {
60                        throw new InitBrokerException("Broker already started");
61                }
62        }
63
64        public static void stopBroker() throws Exception {
65                // Stop the client
66                if (clientStarted) {
67                        ResponseListener.stopResponseListner();
68                        EventDispatcher.stopEventDispatcher();
69                }
70                // Stop all the remote objects working
71                for (String reference : remoteObjs.keySet()) {
72                        unbind(reference);
73                }
74                // Close the connection once all the listeners are died
75                closeConnection();
76        }
77
78        /**
79         * @return Broker's connection
80         * @throws Exception
81         */
82        public static Connection getConnection() throws Exception {
83                return connection;
84        }
85
86        public static void closeConnection() throws IOException {
87                connectionClosed = true;
88                connection.close();
89        }
90
91        /**
92         *
93         * @return Broker's channel
94         * @throws Exception
95         */
96        public static Channel getChannel() throws Exception {
97                return channel;
98        }
99
100        /**
101         * Creates a new channel using the Broker's connection
102         *
103         * @return newChannel
104         * @throws IOException
105         */
106        public static Channel getNewChannel() throws IOException {
107                return connection.createChannel();
108        }
109
110        @SuppressWarnings("unchecked")
111        public static <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
112                try {
113                        Properties environment = Environment.getEnvironment();
114
115                        if (!clientStarted) {
116                                initClient(environment);
117                                clientStarted = true;
118                        }
119
120                        if (!Proxymq.containsProxy(reference)) {
121                                Proxymq proxy = new Proxymq(reference, contract, environment);
122                                Class<?>[] array = { contract };
123                                return (T) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
124                        }
125                        return (T) Proxymq.getInstance(reference);
126
127                } catch (Exception e) {
128                        throw new RemoteException(e);
129                }
130        }
131
132        public static void bind(String reference, RemoteObject remote) throws RemoteException {
133                try {
134                        Properties environment = Environment.getEnvironment();
135                        remote.startRemoteObject(reference, environment);
136                        remoteObjs.put(reference, remote);
137                } catch (Exception e) {
138                        throw new RemoteException(e);
139                }
140        }
141
142        public static void unbind(String reference) throws RemoteException, IOException {
143                if (remoteObjs.containsKey(reference)) {
144                        RemoteObject remote = remoteObjs.get(reference);
145                        remote.kill();
146                } else {
147                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
148                }
149
150        }
151
152        public void rebind(String name, Remote obj) throws RemoteException {
153
154        }
155
156        /**
157         * This method ensures the client will have only one ResponseListener and
158         * only one EventDispatcher. Both with the same environment.
159         *
160         * @param environment
161         * @throws Exception
162         */
163        private static synchronized void initClient(Properties environment) throws Exception {
164                if (ResponseListener.isVoid()) {
165                        ResponseListener.init(environment);
166                }
167                if (EventDispatcher.isVoid()) {
168                        EventDispatcher.init(environment);
169                }
170        }
171
172        /**
173         * This method sends an event with its information
174         *
175         * @param event
176         * @throws IOException
177         * @throws SerializerException
178         */
179        public static void trigger(Event event) throws IOException, SerializerException {
180                String UID = event.getTopic();
181                EventWrapper wrapper = new EventWrapper(event);
182                System.out.println("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());
183                channel.exchangeDeclare(UID, "fanout");
184
185                byte[] bytesResponse = Serializer.serialize(wrapper);
186                channel.basicPublish(UID, "", null, bytesResponse);
187
188                // Log.saveLog("Server-Serialize", bytesResponse);
189        }
190
191        /**
192         * This function is used to send a ping message to see if the connection
193         * works
194         *
195         * @param env
196         * @throws Exception
197         */
198        public static void tryConnection(Properties env) throws Exception {
199                Channel channel = connection.createChannel();
200                String message = "ping";
201
202                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
203                String queueName = exchange;
204                String routingKey = "routingKey";
205
206                channel.exchangeDeclare(exchange, "direct");
207                channel.queueDeclare(queueName, false, false, false, null);
208                channel.queueBind(queueName, exchange, routingKey);
209
210                channel.basicPublish(exchange, routingKey, null, message.getBytes());
211
212                QueueingConsumer consumer = new QueueingConsumer(channel);
213
214                channel.basicConsume(queueName, true, consumer);
215                Delivery delivery = consumer.nextDelivery(1000);
216
217                channel.exchangeDelete(exchange);
218                channel.queueDelete(queueName);
219
220                channel.close();
221
222                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
223                        throw new IOException("Ping initialitzation has failed");
224                }
225        }
226
227        /**
228         * This method adds a ShutdownListener to the Broker's connection. When this
229         * connection falls, a new connection will be created and this will also
230         * have the listener.
231         */
232        private static void addFaultTolerance() {
233                connection.addShutdownListener(new ShutdownListener() {
234                        @Override
235                        public void shutdownCompleted(ShutdownSignalException cause) {
236                                if (!connectionClosed)
237                                        if (cause.isHardError()) {
238                                                if (connection.isOpen()) {
239                                                        try {
240                                                                connection.close();
241                                                        } catch (IOException e) {
242                                                                e.printStackTrace();
243                                                        }
244                                                }
245                                                try {
246                                                        Properties env = Environment.getEnvironment();
247                                                        connection = OmqConnectionFactory.getNewWorkingConnection(env);
248                                                        channel = connection.createChannel();
249                                                        addFaultTolerance();
250                                                } catch (Exception e) {
251                                                        e.printStackTrace();
252                                                }
253                                        } else {
254                                                Channel channel = (Channel) cause.getReference();
255                                                if (channel.isOpen()) {
256                                                        try {
257                                                                channel.close();
258                                                        } catch (IOException e) {
259                                                                e.printStackTrace();
260                                                        }
261                                                }
262                                        }
263                        }
264                });
265        }
266
267}
Note: See TracBrowser for help on using the repository browser.