source: branches/objectmq-1.0/src/omq/common/broker/Broker.java

Last change on this file was 33, checked in by amoreno, 11 years ago

new release version

File size: 5.3 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.util.Properties;
5
6import omq.Remote;
7import omq.client.proxy.Proxymq;
8import omq.client.remote.response.ResponseListener;
9import omq.common.event.Event;
10import omq.common.event.EventDispatcher;
11import omq.common.event.EventWrapper;
12import omq.common.util.Environment;
13import omq.common.util.OmqConnectionFactory;
14import omq.common.util.ParameterQueue;
15import omq.common.util.Serializer;
16import omq.exception.EnvironmentException;
17import omq.exception.InitBrokerException;
18import omq.exception.RemoteException;
19import omq.exception.SerializerException;
20import omq.server.remote.request.RemoteObject;
21
22import com.rabbitmq.client.Channel;
23import com.rabbitmq.client.Connection;
24import com.rabbitmq.client.QueueingConsumer;
25import com.rabbitmq.client.QueueingConsumer.Delivery;
26import com.rabbitmq.client.ShutdownListener;
27import com.rabbitmq.client.ShutdownSignalException;
28
29public class Broker {
30        private static Connection connection;
31        private static Channel channel;
32        private static boolean clientStarted = false;
33
34        public static void initBroker(Properties env) throws Exception {
35                try {
36                        Environment.getEnvironment();
37                } catch (EnvironmentException ex) { // environment not set.
38                        Environment.setEnvironment(env);
39                        connection = OmqConnectionFactory.getNewConnection(env);
40                        channel = connection.createChannel();
41                        addFaultTolerance();
42                        try {
43                                tryConnection(env);
44                        } catch (Exception e) {
45                                channel.close();
46                                connection.close();
47                                throw new InitBrokerException("The connection didn't work");
48                        }
49                }
50        }
51
52        // TODO: what happens if the connection is not set
53        public static Connection getConnection() throws Exception {
54                return connection;
55        }
56
57        public static Channel getChannel() throws Exception {
58                return channel;
59        }
60
61        public static Channel getNewChannel() throws IOException {
62                return connection.createChannel();
63        }
64
65        public static Remote lookup(String reference, Class<?> contract) throws RemoteException {
66                try {
67                        Properties environment = Environment.getEnvironment();
68
69                        if (!clientStarted) {
70                                initClient(environment);
71                                clientStarted = true;
72                        }
73
74                        if (!Proxymq.containsProxy(reference)) {
75                                Proxymq proxy = new Proxymq(reference, contract, environment);
76                                Class<?>[] array = { contract };
77                                return (Remote) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
78                        }
79                        return (Remote) Proxymq.getInstance(reference);
80
81                } catch (Exception e) {
82                        throw new RemoteException(e);
83                }
84        }
85
86        public static void bind(String reference, RemoteObject remote) throws RemoteException {
87                try {
88                        Properties environment = Environment.getEnvironment();
89                        remote.startRemoteObject(reference, environment);
90                } catch (Exception e) {
91                        throw new RemoteException(e);
92                }
93        }
94
95        public static void unbind(String reference) throws RemoteException {
96
97        }
98
99        public void rebind(String name, Remote obj) throws RemoteException {
100
101        }
102
103        private static synchronized void initClient(Properties environment) throws Exception {
104                if (ResponseListener.isVoid()) {
105                        ResponseListener.init(environment);
106                }
107                if (EventDispatcher.isVoid()) {
108                        EventDispatcher.init(environment);
109                }
110        }
111
112        public static void trigger(Event event) throws IOException, SerializerException {
113                String UID = event.getTopic();
114                EventWrapper wrapper = new EventWrapper(event);
115                System.out.println("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());
116                channel.exchangeDeclare(UID, "fanout");
117               
118                byte[] bytesResponse = Serializer.serialize(wrapper);
119                channel.basicPublish(UID, "", null, bytesResponse);
120
121                //Log.saveLog("Server-Serialize", bytesResponse);                       
122        }
123
124        public static void tryConnection(Properties env) throws Exception {
125                Channel channel = connection.createChannel();
126                String message = "ping";
127
128                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
129                String queueName = exchange;
130                String routingKey = "routingKey";
131
132                channel.exchangeDeclare(exchange, "direct");
133                channel.queueDeclare(queueName, false, false, false, null);
134                channel.queueBind(queueName, exchange, routingKey);
135
136                channel.basicPublish(exchange, routingKey, null, message.getBytes());
137
138                QueueingConsumer consumer = new QueueingConsumer(channel);
139
140                channel.basicConsume(queueName, true, consumer);
141                Delivery delivery = consumer.nextDelivery(1000);
142
143                channel.exchangeDelete(exchange);
144                channel.queueDelete(queueName);
145
146                channel.close();
147
148                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
149                        throw new IOException("Ping-pong initialitzation has failed");
150                }
151        }
152
153        private static void addFaultTolerance() {
154                connection.addShutdownListener(new ShutdownListener() {
155                        @Override
156                        public void shutdownCompleted(ShutdownSignalException cause) {
157
158                                if (cause.isHardError()) {
159                                        if (connection.isOpen()) {
160                                                try {
161                                                        connection.close();
162                                                } catch (IOException e) {
163                                                        e.printStackTrace();
164                                                }
165                                        }
166                                        try {
167                                                Properties env = Environment.getEnvironment();
168                                                connection = OmqConnectionFactory.getNewWorkingConnection(env);
169                                                channel = connection.createChannel();
170                                                addFaultTolerance();
171                                        } catch (Exception e) {
172                                                e.printStackTrace();
173                                        }
174                                } else {
175                                        Channel channel = (Channel) cause.getReference();
176                                        if (channel.isOpen()) {
177                                                try {
178                                                        channel.close();
179                                                } catch (IOException e) {
180                                                        e.printStackTrace();
181                                                }
182                                        }
183                                }
184                        }
185                });
186        }
187
188}
Note: See TracBrowser for help on using the repository browser.