source: branches/objectmq_old/src/omq/common/broker/Broker.java

Last change on this file was 39, checked in by stoda, 11 years ago

Exception test revised.
Broker.lookup does not need the casting
GsonImp? arguments problem solved
MultiProcessTest? added

File size: 7.4 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.util.HashMap;
5import java.util.Map;
6import java.util.Properties;
7
8import omq.Remote;
9import omq.client.listener.ResponseListener;
10import omq.client.proxy.Proxymq;
11import omq.common.event.Event;
12import omq.common.event.EventDispatcher;
13import omq.common.event.EventWrapper;
14import omq.common.util.Environment;
15import omq.common.util.OmqConnectionFactory;
16import omq.common.util.ParameterQueue;
17import omq.common.util.Serializer;
18import omq.exception.InitBrokerException;
19import omq.exception.RemoteException;
20import omq.exception.SerializerException;
21import omq.server.RemoteObject;
22
23import com.rabbitmq.client.Channel;
24import com.rabbitmq.client.Connection;
25import com.rabbitmq.client.QueueingConsumer;
26import com.rabbitmq.client.QueueingConsumer.Delivery;
27import com.rabbitmq.client.ShutdownListener;
28import com.rabbitmq.client.ShutdownSignalException;
29
30public class Broker {
31        private static Connection connection;
32        private static Channel channel;
33        private static boolean clientStarted = false;
34        private static boolean connectionClosed = false;
35        // TODO ask Pedro if it can be only one object in the map (an object can
36        // have multiple threads in the same broker -see environment-)
37        private static Map<String, RemoteObject> remoteObjs;
38
39        /**
40         * Initializes a new Broker with the environment called by reference
41         *
42         * @param env
43         * @throws Exception
44         */
45        public static synchronized void initBroker(Properties env) throws Exception {
46                if (Environment.isVoid()) {
47                        remoteObjs = new HashMap<String, RemoteObject>();
48                        Environment.setEnvironment(env);
49                        connection = OmqConnectionFactory.getNewConnection(env);
50                        channel = connection.createChannel();
51                        addFaultTolerance();
52                        try {
53                                tryConnection(env);
54                        } catch (Exception e) {
55                                channel.close();
56                                connection.close();
57                                throw new InitBrokerException("The connection didn't work");
58                        }
59                } else {
60                        throw new InitBrokerException("Broker already started");
61                }
62        }
63
64        public static void stopBroker() throws Exception {
65                // Stop the client
66                if (clientStarted) {
67                        ResponseListener.stopResponseListner();
68                        EventDispatcher.stopEventDispatcher();
69                }
70                // Stop all the remote objects working
71                for (String reference : remoteObjs.keySet()) {
72                        unbind(reference);
73                }
74                // Close the connection once all the listeners are died
75                closeConnection();
76        }
77
78        /**
79         * @return Broker's connection
80         * @throws Exception
81         */
82        public static Connection getConnection() throws Exception {
83                return connection;
84        }
85
86        public static void closeConnection() throws IOException {
87                connectionClosed = true;
88                connection.close();
89        }
90
91        /**
92         *
93         * @return Broker's channel
94         * @throws Exception
95         */
96        public static Channel getChannel() throws Exception {
97                return channel;
98        }
99
100        /**
101         * Creates a new channel using the Broker's connection
102         *
103         * @return newChannel
104         * @throws IOException
105         */
106        public static Channel getNewChannel() throws IOException {
107                return connection.createChannel();
108        }
109
110        @SuppressWarnings("unchecked")
111        public static <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
112                try {
113                        Properties environment = Environment.getEnvironment();
114
115                        if (!clientStarted) {
116                                initClient(environment);
117                                clientStarted = true;
118                        }
119
120                        if (!Proxymq.containsProxy(reference)) {
121                                Proxymq proxy = new Proxymq(reference, contract, environment);
122                                Class<?>[] array = { contract };
123                                return (T) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
124                        }
125                        return (T) Proxymq.getInstance(reference);
126
127                } catch (Exception e) {
128                        throw new RemoteException(e);
129                }
130        }
131
132        public static void bind(String reference, RemoteObject remote) throws RemoteException {
133                try {
134                        Properties environment = Environment.getEnvironment();
135                        remote.startRemoteObject(reference, environment);
136                        remoteObjs.put(reference, remote);
137                } catch (Exception e) {
138                        throw new RemoteException(e);
139                }
140        }
141
142        public static void unbind(String reference) throws RemoteException, IOException {
143                if (remoteObjs.containsKey(reference)) {
144                        RemoteObject remote = remoteObjs.get(reference);
145                        remote.kill();
146                } else {
147                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
148                }
149
150        }
151
152        public void rebind(String name, Remote obj) throws RemoteException {
153
154        }
155
156        /**
157         * This method ensures the client will have only one ResponseListener and
158         * only one EventDispatcher. Both with the same environment.
159         *
160         * @param environment
161         * @throws Exception
162         */
163        private static synchronized void initClient(Properties environment) throws Exception {
164                if (ResponseListener.isVoid()) {
165                        ResponseListener.init(environment);
166                }
167                if (EventDispatcher.isVoid()) {
168                        EventDispatcher.init(environment);
169                }
170        }
171
172        /**
173         * This method sends an event with its information
174         *
175         * @param event
176         * @throws IOException
177         * @throws SerializerException
178         */
179        public static void trigger(Event event) throws IOException, SerializerException {
180                String UID = event.getTopic();
181                EventWrapper wrapper = new EventWrapper(event);
182                System.out.println("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());
183                channel.exchangeDeclare(UID, "fanout");
184
185                byte[] bytesResponse = Serializer.serialize(wrapper);
186                channel.basicPublish(UID, "", null, bytesResponse);
187
188                // Log.saveLog("Server-Serialize", bytesResponse);
189        }
190
191        /**
192         * This function is used to send a ping message to see if the connection
193         * works
194         *
195         * @param env
196         * @throws Exception
197         */
198        public static void tryConnection(Properties env) throws Exception {
199                Channel channel = connection.createChannel();
200                String message = "ping";
201
202                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
203                String queueName = exchange;
204                String routingKey = "routingKey";
205
206                channel.exchangeDeclare(exchange, "direct");
207                channel.queueDeclare(queueName, false, false, false, null);
208                channel.queueBind(queueName, exchange, routingKey);
209
210                channel.basicPublish(exchange, routingKey, null, message.getBytes());
211
212                QueueingConsumer consumer = new QueueingConsumer(channel);
213
214                channel.basicConsume(queueName, true, consumer);
215                Delivery delivery = consumer.nextDelivery(1000);
216
217                channel.exchangeDelete(exchange);
218                channel.queueDelete(queueName);
219
220                channel.close();
221
222                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
223                        throw new IOException("Ping initialitzation has failed");
224                }
225        }
226
227        /**
228         * This method adds a ShutdownListener to the Broker's connection. When this
229         * connection falls, a new connection will be created and this will also
230         * have the listener.
231         */
232        private static void addFaultTolerance() {
233                connection.addShutdownListener(new ShutdownListener() {
234                        @Override
235                        public void shutdownCompleted(ShutdownSignalException cause) {
236                                if (!connectionClosed)
237                                        if (cause.isHardError()) {
238                                                if (connection.isOpen()) {
239                                                        try {
240                                                                connection.close();
241                                                        } catch (IOException e) {
242                                                                e.printStackTrace();
243                                                        }
244                                                }
245                                                try {
246                                                        Properties env = Environment.getEnvironment();
247                                                        connection = OmqConnectionFactory.getNewWorkingConnection(env);
248                                                        channel = connection.createChannel();
249                                                        addFaultTolerance();
250                                                } catch (Exception e) {
251                                                        e.printStackTrace();
252                                                }
253                                        } else {
254                                                Channel channel = (Channel) cause.getReference();
255                                                if (channel.isOpen()) {
256                                                        try {
257                                                                channel.close();
258                                                        } catch (IOException e) {
259                                                                e.printStackTrace();
260                                                        }
261                                                }
262                                        }
263                        }
264                });
265        }
266
267}
Note: See TracBrowser for help on using the repository browser.