source: trunk/objectmq/src/omq/server/remote/request/RemoteObject.java @ 24

Last change on this file since 24 was 24, checked in by stoda, 11 years ago

fault tolerance in server if the rabbitmq server falls added

File size: 6.6 KB
Line 
1package omq.server.remote.request;
2
3import java.io.IOException;
4import java.lang.reflect.Method;
5import java.util.ArrayList;
6import java.util.Collection;
7import java.util.HashMap;
8import java.util.List;
9import java.util.Map;
10import java.util.Properties;
11
12import omq.Remote;
13import omq.common.broker.Broker;
14import omq.common.event.Event;
15import omq.common.event.EventListener;
16import omq.common.event.EventWrapper;
17import omq.common.util.ParameterQueue;
18import omq.common.util.Serializer;
19import omq.exception.SerializerException;
20
21import com.rabbitmq.client.Channel;
22import com.rabbitmq.client.ConsumerCancelledException;
23import com.rabbitmq.client.QueueingConsumer;
24import com.rabbitmq.client.QueueingConsumer.Delivery;
25import com.rabbitmq.client.ShutdownSignalException;
26
27/**
28 *
29 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
30 *
31 */
32public abstract class RemoteObject extends Thread implements Remote {
33
34        private static final long serialVersionUID = -1778953938739846450L;
35
36        private String UID;
37        private Properties env;
38        private transient RemoteWrapper remoteWrapper;
39        private transient Map<String, List<Class<?>>> params;
40        private transient Channel channel;
41        private transient QueueingConsumer consumer;
42        private transient boolean killed = false;
43
44        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
45
46        static {
47                primitiveClasses.put("byte", Byte.class);
48                primitiveClasses.put("short", Short.class);
49                primitiveClasses.put("char", Character.class);
50                primitiveClasses.put("int", Integer.class);
51                primitiveClasses.put("long", Long.class);
52                primitiveClasses.put("float", Float.class);
53                primitiveClasses.put("double", Double.class);
54        }
55
56        public RemoteObject() {
57        }
58
59        public void startRemoteObject(String reference, Properties env) throws Exception {
60                this.UID = reference;
61                this.env = env;
62
63                params = new HashMap<String, List<Class<?>>>();
64                for (Method m : this.getClass().getMethods()) {
65                        List<Class<?>> list = new ArrayList<Class<?>>();
66                        for (Class<?> clazz : m.getParameterTypes()) {
67                                list.add(clazz);
68                        }
69                        params.put(m.getName(), list);
70                }
71
72                // Get num threads to use
73                int numThreads = 4;// Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS));
74                remoteWrapper = new RemoteWrapper(this, numThreads);
75
76                startQueues();
77
78                // Start this listener
79                this.start();
80        }
81
82        @Override
83        public void run() {
84                while (!killed) {
85                        try {
86                                Delivery delivery = consumer.nextDelivery();
87                                System.out.println("RemoteObject: " + UID + " has received a message");
88                                remoteWrapper.notifyDelivery(delivery);
89                        } catch (InterruptedException i) {
90                                i.printStackTrace();
91                        } catch (ShutdownSignalException e) {
92                                e.printStackTrace();
93                                try {
94                                        if (channel.isOpen()) {
95                                                channel.close();
96                                        }
97                                        startQueues();
98                                } catch (Exception e1) {
99                                        try {
100                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
101                                                Thread.sleep(milis);
102                                        } catch (InterruptedException e2) {
103                                                // TODO Auto-generated catch block
104                                                e2.printStackTrace();
105                                        }
106                                        e1.printStackTrace();
107                                }
108                        } catch (ConsumerCancelledException e) {
109                                e.printStackTrace();
110                        } catch (SerializerException e) {
111                                e.printStackTrace();
112                        } catch (Exception e) {
113                                e.printStackTrace();
114                        }
115                }
116        }
117
118        @Override
119        public String getRef() {
120                return UID;
121        }
122
123        @Override
124        public void notifyEvent(Event event) throws IOException, SerializerException {
125                event.setTopic(UID);
126                EventWrapper wrapper = new EventWrapper(event);
127                channel.exchangeDeclare(UID, "fanout");
128                channel.basicPublish(UID, "", null, Serializer.serialize(wrapper));
129        }
130
131        public void kill() throws IOException {
132                interrupt();
133                killed = true;
134                channel.close();
135                remoteWrapper.stopRemoteWrapper();
136        }
137
138        public Object invokeMethod(String methodName, Object[] arguments) throws Exception {
139
140                // Get the specific method identified by methodName and its arguments
141                Method method = loadMethod(methodName, arguments);
142
143                return method.invoke(this, arguments);
144        }
145
146        private Method loadMethod(String methodName, Object[] args) throws NoSuchMethodException {
147                Method m = null;
148
149                // Obtain the class reference
150                Class<?> clazz = this.getClass();
151                Class<?>[] argArray = null;
152
153                if (args != null) {
154                        argArray = new Class<?>[args.length];
155                        for (int i = 0; i < args.length; i++) {
156                                argArray[i] = args[i].getClass();
157                        }
158                }
159
160                try {
161                        m = clazz.getMethod(methodName, argArray);
162                } catch (NoSuchMethodException nsm) {
163                        m = loadMethodWithPrimitives(methodName, argArray);
164                }
165                return m;
166        }
167
168        private Method loadMethodWithPrimitives(String methodName, Class<?>[] argArray) throws NoSuchMethodException {
169                Method[] methods = this.getClass().getMethods();
170                int length = argArray.length;
171
172                for (Method method : methods) {
173                        String name = method.getName();
174                        int argsLength = method.getParameterTypes().length;
175
176                        if (name.equals(methodName) && length == argsLength) {
177                                // This array can have primitive types inside
178                                Class<?>[] params = method.getParameterTypes();
179
180                                boolean found = true;
181
182                                for (int i = 0; i < length; i++) {
183                                        if (params[i].isPrimitive()) {
184                                                Class<?> paramWrapper = primitiveClasses.get(params[i].getName());
185
186                                                if (!paramWrapper.equals(argArray[i])) {
187                                                        found = false;
188                                                        break;
189                                                }
190                                        }
191                                }
192                                if (found) {
193                                        return method;
194                                }
195                        }
196                }
197                throw new NoSuchMethodException(methodName);
198        }
199
200        public List<Class<?>> getParams(String methodName) {
201                return params.get(methodName);
202        }
203
204        public Channel getChannel() {
205                return channel;
206        }
207
208        private void startQueues() throws Exception {
209                // Get info about which exchange and queue will use
210                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
211                String queue = UID;
212                String routingKey = UID;
213
214                // Start channel
215                channel = Broker.getNewChannel();
216
217                // Declares and bindings
218                System.out.println("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue);
219                channel.exchangeDeclare(exchange, "direct");
220                channel.queueDeclare(queue, false, false, false, null);
221                channel.queueBind(queue, exchange, routingKey);
222
223                // Declare the event topic fanout
224                System.out.println("RemoteObject: " + UID + " declaring fanout exchange: " + UID);
225                channel.exchangeDeclare(UID, "fanout");
226
227                // Declare a new consumer
228                consumer = new QueueingConsumer(channel);
229                channel.basicConsume(queue, true, consumer);
230        }
231
232        @Override
233        public void addListener(EventListener eventListener) throws Exception {
234        }
235
236        @Override
237        public void removeListener(EventListener eventListener) throws Exception {
238        }
239
240        @Override
241        public Collection<EventListener> getListeners() throws Exception {
242                return null;
243        }
244
245}
Note: See TracBrowser for help on using the repository browser.