source: trunk/objectmq/src/omq/common/broker/Broker.java @ 24

Last change on this file since 24 was 24, checked in by stoda, 12 years ago

fault tolerance in server if the rabbitmq server falls added

File size: 4.9 KB
Line 
1package omq.common.broker;
2
3import java.io.IOException;
4import java.util.Properties;
5
6import omq.Remote;
7import omq.client.proxy.Proxymq;
8import omq.client.remote.response.ResponseListener;
9import omq.common.event.Event;
10import omq.common.event.EventDispatcher;
11import omq.common.event.EventWrapper;
12import omq.common.util.Environment;
13import omq.common.util.OmqConnectionFactory;
14import omq.common.util.ParameterQueue;
15import omq.common.util.Serializer;
16import omq.exception.EnvironmentException;
17import omq.exception.InitBrokerException;
18import omq.exception.RemoteException;
19import omq.exception.SerializerException;
20import omq.server.remote.request.RemoteObject;
21
22import com.rabbitmq.client.Channel;
23import com.rabbitmq.client.Connection;
24import com.rabbitmq.client.QueueingConsumer;
25import com.rabbitmq.client.ShutdownListener;
26import com.rabbitmq.client.ShutdownSignalException;
27import com.rabbitmq.client.QueueingConsumer.Delivery;
28
29public class Broker {
30        private static Connection connection;
31        private static Channel channel;
32        private static boolean clientStarted = false;
33
34        public static void initBroker(Properties env) throws Exception {
35                try {
36                        Environment.getEnvironment();
37                } catch (EnvironmentException ex) { // environment not set.
38                        Environment.setEnvironment(env);
39                        connection = OmqConnectionFactory.getNewConnection(env);
40                        connection.addShutdownListener(new ShutdownListener() {
41                                @Override
42                                public void shutdownCompleted(ShutdownSignalException cause) {
43                                        if (connection.isOpen()) {
44                                                try {
45                                                        connection.close();
46                                                } catch (IOException e) {
47                                                        e.printStackTrace();
48                                                }
49                                        }
50                                        try {
51                                                Properties env = Environment.getEnvironment();
52                                                connection = OmqConnectionFactory.getNewWorkingConnection(env);
53                                                channel = connection.createChannel();
54                                        } catch (Exception e) {
55                                                e.printStackTrace();
56                                        }
57                                }
58                        });
59                        channel = connection.createChannel();
60                        try {
61                                tryConnection(env);
62                        } catch (Exception e) {
63                                channel.close();
64                                connection.close();
65                                throw new InitBrokerException("The connection didn't work");
66                        }
67                }
68        }
69
70        // TODO: what happens if the connection is not set
71        public static Connection getConnection() throws Exception {
72                return connection;
73        }
74
75        public static Channel getChannel() throws Exception {
76                return channel;
77        }
78
79        public static Channel getNewChannel() throws IOException {
80                return connection.createChannel();
81        }
82
83        public static Remote lookup(String reference, Class<?> contract) throws RemoteException {
84                try {
85                        Properties environment = Environment.getEnvironment();
86
87                        if (!clientStarted) {
88                                initClient(environment);
89                                clientStarted = true;
90                        }
91
92                        if (!Proxymq.containsProxy(reference)) {
93                                Proxymq proxy = new Proxymq(reference, contract, environment);
94                                Class<?>[] array = { contract };
95                                return (Remote) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
96                        }
97                        return (Remote) Proxymq.getInstance(reference);
98
99                } catch (Exception e) {
100                        throw new RemoteException(e);
101                }
102        }
103
104        public static void bind(String reference, RemoteObject remote) throws RemoteException {
105                try {
106                        Properties environment = Environment.getEnvironment();
107                        remote.startRemoteObject(reference, environment);
108                } catch (Exception e) {
109                        throw new RemoteException(e);
110                }
111        }
112
113        public static void unbind(String reference) throws RemoteException {
114
115        }
116
117        public void rebind(String name, Remote obj) throws RemoteException {
118
119        }
120
121        private static void initClient(Properties environment) throws Exception {
122                if (ResponseListener.isVoid()) {
123                        ResponseListener.init(environment);
124                }
125                if (EventDispatcher.isVoid()) {
126                        EventDispatcher.init(environment);
127                }
128        }
129
130        public static void trigger(Event event) throws IOException, SerializerException {
131                String UID = event.getTopic();
132                EventWrapper wrapper = new EventWrapper(event);
133                System.out.println("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());
134                channel.exchangeDeclare(UID, "fanout");
135                channel.basicPublish(UID, "", null, Serializer.serialize(wrapper));
136        }
137
138        public static void tryConnection(Properties env) throws Exception {
139                Channel channel = connection.createChannel();
140                String message = "ping";
141
142                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
143                String queueName = exchange;
144                String routingKey = "routingKey";
145
146                channel.exchangeDeclare(exchange, "direct");
147                channel.queueDeclare(queueName, false, false, false, null);
148                channel.queueBind(queueName, exchange, routingKey);
149
150                channel.basicPublish(exchange, routingKey, null, message.getBytes());
151
152                QueueingConsumer consumer = new QueueingConsumer(channel);
153
154                channel.basicConsume(queueName, true, consumer);
155                Delivery delivery = consumer.nextDelivery(1000);
156
157                channel.exchangeDelete(exchange);
158                channel.queueDelete(queueName);
159
160                channel.close();
161
162                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
163                        throw new IOException("Ping-pong initialitzation has failed");
164                }
165        }
166
167}
Note: See TracBrowser for help on using the repository browser.