source: trunk/objectmq/src/omq/common/broker/Broker.java @ 29

Last change on this file since 29 was 29, checked in by stoda, 11 years ago

Singleton didn't worked in parallel threads solved adding synchronized.
Num threads by property added.
Example with different threads added.

File size: 5.2 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.util.Properties;
5
6import omq.Remote;
7import omq.client.proxy.Proxymq;
8import omq.client.remote.response.ResponseListener;
9import omq.common.event.Event;
10import omq.common.event.EventDispatcher;
11import omq.common.event.EventWrapper;
12import omq.common.util.Environment;
13import omq.common.util.OmqConnectionFactory;
14import omq.common.util.ParameterQueue;
15import omq.common.util.Serializer;
16import omq.exception.EnvironmentException;
17import omq.exception.InitBrokerException;
18import omq.exception.RemoteException;
19import omq.exception.SerializerException;
20import omq.server.remote.request.RemoteObject;
21
22import com.rabbitmq.client.Channel;
23import com.rabbitmq.client.Connection;
24import com.rabbitmq.client.QueueingConsumer;
25import com.rabbitmq.client.QueueingConsumer.Delivery;
26import com.rabbitmq.client.ShutdownListener;
27import com.rabbitmq.client.ShutdownSignalException;
28
29public class Broker {
30        private static Connection connection;
31        private static Channel channel;
32        private static boolean clientStarted = false;
33
34        public static void initBroker(Properties env) throws Exception {
35                try {
36                        Environment.getEnvironment();
37                } catch (EnvironmentException ex) { // environment not set.
38                        Environment.setEnvironment(env);
39                        connection = OmqConnectionFactory.getNewConnection(env);
40                        channel = connection.createChannel();
41                        addFaultTolerance();
42                        try {
43                                tryConnection(env);
44                        } catch (Exception e) {
45                                channel.close();
46                                connection.close();
47                                throw new InitBrokerException("The connection didn't work");
48                        }
49                }
50        }
51
52        // TODO: what happens if the connection is not set
53        public static Connection getConnection() throws Exception {
54                return connection;
55        }
56
57        public static Channel getChannel() throws Exception {
58                return channel;
59        }
60
61        public static Channel getNewChannel() throws IOException {
62                return connection.createChannel();
63        }
64
65        public static Remote lookup(String reference, Class<?> contract) throws RemoteException {
66                try {
67                        Properties environment = Environment.getEnvironment();
68
69                        if (!clientStarted) {
70                                initClient(environment);
71                                clientStarted = true;
72                        }
73
74                        if (!Proxymq.containsProxy(reference)) {
75                                Proxymq proxy = new Proxymq(reference, contract, environment);
76                                Class<?>[] array = { contract };
77                                return (Remote) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
78                        }
79                        return (Remote) Proxymq.getInstance(reference);
80
81                } catch (Exception e) {
82                        throw new RemoteException(e);
83                }
84        }
85
86        public static void bind(String reference, RemoteObject remote) throws RemoteException {
87                try {
88                        Properties environment = Environment.getEnvironment();
89                        remote.startRemoteObject(reference, environment);
90                } catch (Exception e) {
91                        throw new RemoteException(e);
92                }
93        }
94
95        public static void unbind(String reference) throws RemoteException {
96
97        }
98
99        public void rebind(String name, Remote obj) throws RemoteException {
100
101        }
102
103        private static synchronized void initClient(Properties environment) throws Exception {
104                if (ResponseListener.isVoid()) {
105                        ResponseListener.init(environment);
106                }
107                if (EventDispatcher.isVoid()) {
108                        EventDispatcher.init(environment);
109                }
110        }
111
112        public static void trigger(Event event) throws IOException, SerializerException {
113                String UID = event.getTopic();
114                EventWrapper wrapper = new EventWrapper(event);
115                System.out.println("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());
116                channel.exchangeDeclare(UID, "fanout");
117                channel.basicPublish(UID, "", null, Serializer.serialize(wrapper));
118        }
119
120        public static void tryConnection(Properties env) throws Exception {
121                Channel channel = connection.createChannel();
122                String message = "ping";
123
124                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
125                String queueName = exchange;
126                String routingKey = "routingKey";
127
128                channel.exchangeDeclare(exchange, "direct");
129                channel.queueDeclare(queueName, false, false, false, null);
130                channel.queueBind(queueName, exchange, routingKey);
131
132                channel.basicPublish(exchange, routingKey, null, message.getBytes());
133
134                QueueingConsumer consumer = new QueueingConsumer(channel);
135
136                channel.basicConsume(queueName, true, consumer);
137                Delivery delivery = consumer.nextDelivery(1000);
138
139                channel.exchangeDelete(exchange);
140                channel.queueDelete(queueName);
141
142                channel.close();
143
144                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
145                        throw new IOException("Ping-pong initialitzation has failed");
146                }
147        }
148
149        private static void addFaultTolerance() {
150                connection.addShutdownListener(new ShutdownListener() {
151                        @Override
152                        public void shutdownCompleted(ShutdownSignalException cause) {
153
154                                if (cause.isHardError()) {
155                                        if (connection.isOpen()) {
156                                                try {
157                                                        connection.close();
158                                                } catch (IOException e) {
159                                                        e.printStackTrace();
160                                                }
161                                        }
162                                        try {
163                                                Properties env = Environment.getEnvironment();
164                                                connection = OmqConnectionFactory.getNewWorkingConnection(env);
165                                                channel = connection.createChannel();
166                                                addFaultTolerance();
167                                        } catch (Exception e) {
168                                                e.printStackTrace();
169                                        }
170                                } else {
171                                        Channel channel = (Channel) cause.getReference();
172                                        if (channel.isOpen()) {
173                                                try {
174                                                        channel.close();
175                                                } catch (IOException e) {
176                                                        e.printStackTrace();
177                                                }
178                                        }
179                                }
180                        }
181                });
182        }
183
184}
Note: See TracBrowser for help on using the repository browser.