source: trunk/objectmq/src/omq/server/remote/request/RemoteObject.java @ 14

Last change on this file since 14 was 14, checked in by stoda, 11 years ago

Events added

File size: 5.4 KB
Line 
1package omq.server.remote.request;
2
3import java.io.IOException;
4import java.lang.reflect.Method;
5import java.util.ArrayList;
6import java.util.HashMap;
7import java.util.List;
8import java.util.Map;
9import java.util.Properties;
10
11import omq.Remote;
12import omq.common.broker.Broker;
13import omq.common.event.Event;
14import omq.common.util.ParameterQueue;
15import omq.common.util.Serializer;
16import omq.exception.SerializerException;
17
18import com.rabbitmq.client.Channel;
19import com.rabbitmq.client.ConsumerCancelledException;
20import com.rabbitmq.client.QueueingConsumer;
21import com.rabbitmq.client.QueueingConsumer.Delivery;
22import com.rabbitmq.client.ShutdownSignalException;
23
24/**
25 *
26 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
27 *
28 */
29public abstract class RemoteObject extends Thread implements Remote {
30
31        private static final long serialVersionUID = -1778953938739846450L;
32
33        private String UID;
34        private transient RemoteWrapper remoteWrapper;
35        private transient Map<String, List<Class<?>>> params;
36        private transient Channel channel;
37        private transient QueueingConsumer consumer;
38        private transient boolean killed = false;
39
40        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
41
42        static {
43                primitiveClasses.put("byte", Byte.class);
44                primitiveClasses.put("short", Short.class);
45                primitiveClasses.put("char", Character.class);
46                primitiveClasses.put("int", Integer.class);
47                primitiveClasses.put("long", Long.class);
48                primitiveClasses.put("float", Float.class);
49                primitiveClasses.put("double", Double.class);
50        }
51
52        public RemoteObject() {
53        }
54
55        public void start(String reference, Properties env) throws Exception {
56                this.UID = reference;
57
58                params = new HashMap<String, List<Class<?>>>();
59                for (Method m : this.getClass().getMethods()) {
60                        List<Class<?>> list = new ArrayList<Class<?>>();
61                        for (Class<?> clazz : m.getParameterTypes()) {
62                                list.add(clazz);
63                        }
64                        params.put(m.getName(), list);
65                }
66
67                // Get num threads to use
68                int numThreads = 4;// Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS));
69                remoteWrapper = new RemoteWrapper(this, numThreads);
70
71                // Get info about which exchange and queue will use
72                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
73                String queue = UID;
74                String routingKey = UID;
75
76                // Start channel
77                channel = Broker.getNewChannel();
78
79                // Declares and bindings
80                channel.exchangeDeclare(exchange, "direct");
81                channel.queueDeclare(queue, false, false, false, null);
82                channel.queueBind(queue, exchange, routingKey);
83
84                // Declare the event topic fanout
85                channel.exchangeDeclare(UID, "fanout");
86
87                // Declare a new consumer
88                consumer = new QueueingConsumer(channel);
89                channel.basicConsume(queue, true, consumer);
90
91                // Start this listener
92                this.start();
93        }
94
95        @Override
96        public void run() {
97                while (!killed) {
98                        try {
99                                Delivery delivery = consumer.nextDelivery();
100                                remoteWrapper.notifyDelivery(delivery);
101                        } catch (InterruptedException i) {
102                                i.printStackTrace();
103                        } catch (ShutdownSignalException e) {
104                                e.printStackTrace();
105                        } catch (ConsumerCancelledException e) {
106                                e.printStackTrace();
107                        } catch (SerializerException e) {
108                                e.printStackTrace();
109                        } catch (Exception e) {
110                                e.printStackTrace();
111                        }
112                }
113        }
114
115        @Override
116        public String getRef() {
117                return UID;
118        }
119
120        @Override
121        public void notifyEvent(Event event) throws IOException, SerializerException {
122                event.setTopic(UID);
123                channel.exchangeDeclare(UID, "fanout");
124                channel.basicPublish(UID, "", null, Serializer.serialize(event));
125        }
126
127        public void kill() throws IOException {
128                interrupt();
129                killed = true;
130                channel.close();
131                remoteWrapper.stopRemoteWrapper();
132        }
133
134        public Object invokeMethod(String methodName, Object[] arguments) throws Exception {
135
136                // Get the specific method identified by methodName and its arguments
137                Method method = loadMethod(methodName, arguments);
138
139                return method.invoke(this, arguments);
140        }
141
142        private Method loadMethod(String methodName, Object[] args) throws NoSuchMethodException {
143                Method m = null;
144
145                // Obtain the class reference
146                Class<?> clazz = this.getClass();
147                Class<?>[] argArray = null;
148
149                if (args != null) {
150                        argArray = new Class<?>[args.length];
151                        for (int i = 0; i < args.length; i++) {
152                                argArray[i] = args[i].getClass();
153                        }
154                }
155
156                try {
157                        m = clazz.getMethod(methodName, argArray);
158                } catch (NoSuchMethodException nsm) {
159                        m = loadMethodWithPrimitives(methodName, argArray);
160                }
161                return m;
162        }
163
164        private Method loadMethodWithPrimitives(String methodName, Class<?>[] argArray) throws NoSuchMethodException {
165                Method[] methods = this.getClass().getMethods();
166                int length = argArray.length;
167
168                for (Method method : methods) {
169                        String name = method.getName();
170                        int argsLength = method.getParameterTypes().length;
171
172                        if (name.equals(methodName) && length == argsLength) {
173                                // This array can have primitive types inside
174                                Class<?>[] params = method.getParameterTypes();
175
176                                boolean found = true;
177
178                                for (int i = 0; i < length; i++) {
179                                        if (params[i].isPrimitive()) {
180                                                Class<?> paramWrapper = primitiveClasses.get(params[i].getName());
181
182                                                if (!paramWrapper.equals(argArray[i])) {
183                                                        found = false;
184                                                        break;
185                                                }
186                                        }
187                                }
188                                if (found) {
189                                        return method;
190                                }
191                        }
192                }
193                throw new NoSuchMethodException(methodName);
194        }
195
196        public List<Class<?>> getParams(String methodName) {
197                return params.get(methodName);
198        }
199
200        public Channel getChannel() {
201                return channel;
202        }
203
204}
Note: See TracBrowser for help on using the repository browser.