source: branches/objectmq_old/src/omq/server/RemoteObject.java

Last change on this file was 38, checked in by stoda, 11 years ago

Remote exception added
todo: change throws Exception to Throwable since it seems that NoSuchMethodException? does not inherit from Exception...

File size: 6.7 KB
Line 
1package omq.server;
2
3import java.io.IOException;
4import java.lang.reflect.Method;
5import java.util.ArrayList;
6import java.util.Collection;
7import java.util.HashMap;
8import java.util.List;
9import java.util.Map;
10import java.util.Properties;
11
12import omq.Remote;
13import omq.common.broker.Broker;
14import omq.common.event.Event;
15import omq.common.event.EventListener;
16import omq.common.event.EventWrapper;
17import omq.common.util.ParameterQueue;
18import omq.common.util.Serializer;
19import omq.exception.SerializerException;
20
21import com.rabbitmq.client.Channel;
22import com.rabbitmq.client.ConsumerCancelledException;
23import com.rabbitmq.client.QueueingConsumer;
24import com.rabbitmq.client.QueueingConsumer.Delivery;
25import com.rabbitmq.client.ShutdownSignalException;
26
27/**
28 *
29 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
30 *
31 */
32public abstract class RemoteObject extends Thread implements Remote {
33
34        private static final long serialVersionUID = -1778953938739846450L;
35
36        private String UID;
37        private Properties env;
38        private transient RemoteWrapper remoteWrapper;
39        private transient Map<String, List<Class<?>>> params;
40        private transient Channel channel;
41        private transient QueueingConsumer consumer;
42        private transient boolean killed = false;
43
44        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
45
46        static {
47                primitiveClasses.put("byte", Byte.class);
48                primitiveClasses.put("short", Short.class);
49                primitiveClasses.put("char", Character.class);
50                primitiveClasses.put("int", Integer.class);
51                primitiveClasses.put("long", Long.class);
52                primitiveClasses.put("float", Float.class);
53                primitiveClasses.put("double", Double.class);
54        }
55
56        public RemoteObject() {
57        }
58
59        public void startRemoteObject(String reference, Properties env) throws Exception {
60                this.UID = reference;
61                this.env = env;
62
63                params = new HashMap<String, List<Class<?>>>();
64                for (Method m : this.getClass().getMethods()) {
65                        List<Class<?>> list = new ArrayList<Class<?>>();
66                        for (Class<?> clazz : m.getParameterTypes()) {
67                                list.add(clazz);
68                        }
69                        params.put(m.getName(), list);
70                }
71
72                // Get num threads to use
73                int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
74                remoteWrapper = new RemoteWrapper(this, numThreads);
75
76                startQueues();
77
78                // Start this listener
79                this.start();
80        }
81
82        @Override
83        public void run() {
84                while (!killed) {
85                        try {
86                                Delivery delivery = consumer.nextDelivery();
87                                System.out.println("RemoteObject: " + UID + " has received a message");
88                                remoteWrapper.notifyDelivery(delivery);
89                        } catch (InterruptedException i) {
90                                i.printStackTrace();
91                        } catch (ShutdownSignalException e) {
92                                e.printStackTrace();
93                                try {
94                                        if (channel.isOpen()) {
95                                                channel.close();
96                                        }
97                                        startQueues();
98                                } catch (Exception e1) {
99                                        try {
100                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
101                                                Thread.sleep(milis);
102                                        } catch (InterruptedException e2) {
103                                                e2.printStackTrace();
104                                        }
105                                        e1.printStackTrace();
106                                }
107                        } catch (ConsumerCancelledException e) {
108                                e.printStackTrace();
109                        } catch (SerializerException e) {
110                                e.printStackTrace();
111                        } catch (Exception e) {
112                                e.printStackTrace();
113                        }
114                }
115        }
116
117        @Override
118        public String getRef() {
119                return UID;
120        }
121
122        @Override
123        public void notifyEvent(Event event) throws IOException, SerializerException {
124                event.setTopic(UID);
125                EventWrapper wrapper = new EventWrapper(event);
126                channel.exchangeDeclare(UID, "fanout");
127                channel.basicPublish(UID, "", null, Serializer.serialize(wrapper));
128        }
129
130        public void kill() throws IOException {
131                killed = true;
132                interrupt();
133                channel.close();
134                remoteWrapper.stopRemoteWrapper();
135        }
136
137        public Object invokeMethod(String methodName, Object[] arguments) throws Exception {
138
139                // Get the specific method identified by methodName and its arguments
140                Method method = loadMethod(methodName, arguments);
141
142                return method.invoke(this, arguments);
143        }
144
145        private Method loadMethod(String methodName, Object[] args) throws NoSuchMethodException {
146                Method m = null;
147
148                // Obtain the class reference
149                Class<?> clazz = this.getClass();
150                Class<?>[] argArray = null;
151
152                if (args != null) {
153                        argArray = new Class<?>[args.length];
154                        for (int i = 0; i < args.length; i++) {
155                                argArray[i] = args[i].getClass();
156                        }
157                }
158
159                try {
160                        m = clazz.getMethod(methodName, argArray);
161                } catch (NoSuchMethodException nsm) {
162                        m = loadMethodWithPrimitives(methodName, argArray);
163                }
164                return m;
165        }
166
167        private Method loadMethodWithPrimitives(String methodName, Class<?>[] argArray) throws NoSuchMethodException {
168                if (argArray != null) {
169                        Method[] methods = this.getClass().getMethods();
170                        int length = argArray.length;
171
172                        for (Method method : methods) {
173                                String name = method.getName();
174                                int argsLength = method.getParameterTypes().length;
175
176                                if (name.equals(methodName) && length == argsLength) {
177                                        // This array can have primitive types inside
178                                        Class<?>[] params = method.getParameterTypes();
179
180                                        boolean found = true;
181
182                                        for (int i = 0; i < length; i++) {
183                                                if (params[i].isPrimitive()) {
184                                                        Class<?> paramWrapper = primitiveClasses.get(params[i].getName());
185
186                                                        if (!paramWrapper.equals(argArray[i])) {
187                                                                found = false;
188                                                                break;
189                                                        }
190                                                }
191                                        }
192                                        if (found) {
193                                                return method;
194                                        }
195                                }
196                        }
197                }
198                throw new NoSuchMethodException(methodName);
199        }
200
201        public List<Class<?>> getParams(String methodName) {
202                return params.get(methodName);
203        }
204
205        public Channel getChannel() {
206                return channel;
207        }
208
209        private void startQueues() throws Exception {
210                // Get info about which exchange and queue will use
211                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
212                String queue = UID;
213                String routingKey = UID;
214                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
215
216                // Start channel
217                channel = Broker.getNewChannel();
218
219                // Declares and bindings
220                System.out.println("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue);
221                channel.exchangeDeclare(exchange, "direct");
222                channel.queueDeclare(queue, durable, false, false, null);
223                channel.queueBind(queue, exchange, routingKey);
224
225                // Declare the event topic fanout
226                System.out.println("RemoteObject: " + UID + " declaring fanout exchange: " + UID);
227                channel.exchangeDeclare(UID, "fanout");
228
229                // Declare a new consumer
230                consumer = new QueueingConsumer(channel);
231                channel.basicConsume(queue, true, consumer);
232        }
233
234        @Override
235        public void addListener(EventListener<?> eventListener) throws Exception {
236        }
237
238        @Override
239        public void removeListener(EventListener<?> eventListener) throws Exception {
240        }
241
242        @Override
243        public Collection<EventListener<?>> getListeners() throws Exception {
244                return null;
245        }
246
247}
Note: See TracBrowser for help on using the repository browser.