source: trunk/src/main/java/omq/server/RemoteObject.java @ 83

Last change on this file since 83 was 83, checked in by stoda, 11 years ago

J

File size: 8.6 KB
Line 
1package omq.server;
2
3import java.io.IOException;
4import java.lang.reflect.Method;
5import java.util.ArrayList;
6import java.util.HashMap;
7import java.util.List;
8import java.util.Map;
9import java.util.Properties;
10
11import omq.Remote;
12import omq.common.broker.Broker;
13import omq.common.util.ParameterQueue;
14import omq.exception.SerializerException;
15
16import org.apache.log4j.Logger;
17
18import com.rabbitmq.client.Channel;
19import com.rabbitmq.client.ConsumerCancelledException;
20import com.rabbitmq.client.QueueingConsumer;
21import com.rabbitmq.client.QueueingConsumer.Delivery;
22import com.rabbitmq.client.ShutdownSignalException;
23
24/**
25 * A RemoteObject when it's started will be waiting for requests and will invoke
26 * them. When a RemoteObject is started it listens two queues, the first one has
27 * the same name as its reference and the second one is its multiqueue -this
28 * name can be set using a property, be aware to use a name not used by another
29 * object!!!-.
30 *
31 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
32 *
33 */
34public abstract class RemoteObject extends Thread implements Remote {
35
36        private static final long serialVersionUID = -1778953938739846450L;
37        private static final String multi = "multi#";
38        private static final Logger logger = Logger.getLogger(RemoteObject.class.getName());
39
40        private String UID;
41        private String multiQueue;
42        private Properties env;
43        private transient Broker broker;
44        private transient RemoteWrapper remoteWrapper;
45        private transient Map<String, List<Class<?>>> params;
46        private transient Channel channel;
47        private transient QueueingConsumer consumer;
48        private transient boolean killed = false;
49
50        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
51
52        static {
53                primitiveClasses.put("byte", Byte.class);
54                primitiveClasses.put("short", Short.class);
55                primitiveClasses.put("char", Character.class);
56                primitiveClasses.put("int", Integer.class);
57                primitiveClasses.put("long", Long.class);
58                primitiveClasses.put("float", Float.class);
59                primitiveClasses.put("double", Double.class);
60        }
61
62        public RemoteObject() {
63        }
64
65        /**
66         * This method starts a remoteObject.
67         *
68         * @param reference
69         *            - broker's binding referece
70         * @param broker
71         *            - broker that binds this remoteObject
72         * @param env
73         *            - properties of this remoteObject
74         * @throws Exception
75         */
76        public void startRemoteObject(String reference, Broker broker, Properties env) throws Exception {
77                this.broker = broker;
78                this.UID = reference;
79                this.multiQueue = UID + System.currentTimeMillis();
80                this.env = env;
81
82                this.params = new HashMap<String, List<Class<?>>>();
83                for (Method m : this.getClass().getMethods()) {
84                        List<Class<?>> list = new ArrayList<Class<?>>();
85                        for (Class<?> clazz : m.getParameterTypes()) {
86                                list.add(clazz);
87                        }
88                        this.params.put(m.getName(), list);
89                }
90
91                // Get num threads to use
92                int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
93                this.remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer());
94
95                startQueues();
96
97                // Start this listener
98                this.start();
99        }
100
101        @Override
102        public void run() {
103                while (!killed) {
104                        try {
105                                Delivery delivery = consumer.nextDelivery();
106
107                                logger.debug(UID + " has received a message, serializer: " + delivery.getProperties().getType());
108
109                                remoteWrapper.notifyDelivery(delivery);
110                        } catch (InterruptedException i) {
111                                logger.error(i);
112                        } catch (ShutdownSignalException e) {
113                                logger.error(e);
114                                try {
115                                        if (channel.isOpen()) {
116                                                channel.close();
117                                        }
118                                        startQueues();
119                                } catch (Exception e1) {
120                                        try {
121                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
122                                                Thread.sleep(milis);
123                                        } catch (InterruptedException e2) {
124                                                logger.error(e2);
125                                        }
126                                        logger.error(e1);
127                                }
128                        } catch (ConsumerCancelledException e) {
129                                logger.error(e);
130                        } catch (SerializerException e) {
131                                logger.error(e);
132                        } catch (Exception e) {
133                                logger.error(e);
134                        }
135                }
136        }
137
138        @Override
139        public String getRef() {
140                return UID;
141        }
142
143        /**
144         * This method kills all the threads waiting for requests
145         *
146         * @throws IOException
147         *             - If an operation failed.
148         */
149        public void kill() throws IOException {
150                logger.warn("Killing objectmq: " + this.getRef());
151                killed = true;
152                interrupt();
153                channel.close();
154                remoteWrapper.stopRemoteWrapper();
155        }
156
157        /**
158         * This method invokes the method specified by methodName and arguments
159         *
160         * @param methodName
161         * @param arguments
162         * @return result
163         * @throws Exception
164         */
165        public Object invokeMethod(String methodName, Object[] arguments) throws Exception {
166
167                // Get the specific method identified by methodName and its arguments
168                Method method = loadMethod(methodName, arguments);
169
170                return method.invoke(this, arguments);
171        }
172
173        /**
174         * This method loads the method specified by methodName and args
175         *
176         * @param methodName
177         * @param args
178         * @return method
179         * @throws NoSuchMethodException
180         *             - If the method cannot be found
181         */
182        private Method loadMethod(String methodName, Object[] args) throws NoSuchMethodException {
183                Method m = null;
184
185                // Obtain the class reference
186                Class<?> clazz = this.getClass();
187                Class<?>[] argArray = null;
188
189                if (args != null) {
190                        argArray = new Class<?>[args.length];
191                        for (int i = 0; i < args.length; i++) {
192                                argArray[i] = args[i].getClass();
193                        }
194                }
195
196                try {
197                        m = clazz.getMethod(methodName, argArray);
198                } catch (NoSuchMethodException nsm) {
199                        m = loadMethodWithPrimitives(methodName, argArray);
200                }
201                return m;
202        }
203
204        /**
205         * This method loads a method which uses primitives as arguments
206         *
207         * @param methodName
208         *            - name of the method wanted to invoke
209         * @param argArray
210         *            - arguments
211         * @return method
212         * @throws NoSuchMethodException
213         *             - If the method cannot be found
214         */
215        private Method loadMethodWithPrimitives(String methodName, Class<?>[] argArray) throws NoSuchMethodException {
216                if (argArray != null) {
217                        Method[] methods = this.getClass().getMethods();
218                        int length = argArray.length;
219
220                        for (Method method : methods) {
221                                String name = method.getName();
222                                int argsLength = method.getParameterTypes().length;
223
224                                if (name.equals(methodName) && length == argsLength) {
225                                        // This array can have primitive types inside
226                                        Class<?>[] params = method.getParameterTypes();
227
228                                        boolean found = true;
229
230                                        for (int i = 0; i < length; i++) {
231                                                if (params[i].isPrimitive()) {
232                                                        Class<?> paramWrapper = primitiveClasses.get(params[i].getName());
233
234                                                        if (!paramWrapper.equals(argArray[i])) {
235                                                                found = false;
236                                                                break;
237                                                        }
238                                                }
239                                        }
240                                        if (found) {
241                                                return method;
242                                        }
243                                }
244                        }
245                }
246                throw new NoSuchMethodException(methodName);
247        }
248
249        public List<Class<?>> getParams(String methodName) {
250                return params.get(methodName);
251        }
252
253        public Channel getChannel() {
254                return channel;
255        }
256
257        /**
258         * This method starts the queues using the information got in the
259         * environment.
260         *
261         * @throws Exception
262         */
263        private void startQueues() throws Exception {
264                // Get info about which exchange and queue will use
265                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
266                String queue = UID;
267                String routingKey = UID;
268
269                // Multi info
270                String multiExchange = multi + UID;
271                if (env.getProperty(ParameterQueue.MULTI_QUEUE_NAME) != null) {
272                        multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
273                }
274
275                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
276                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
277                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
278
279                // Start channel
280                channel = broker.getNewChannel();
281
282                // Declares and bindings
283                logger.info("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
284                                + exclusive + ", AutoDelete: " + autoDelete);
285                channel.exchangeDeclare(exchange, "direct");
286                channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
287                channel.queueBind(queue, exchange, routingKey);
288
289                logger.info("RemoteObject: " + UID + " declaring fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + durable
290                                + ", Exclusive: " + exclusive + ", AutoDelete: " + autoDelete);
291                channel.exchangeDeclare(multiExchange, "fanout");
292                channel.queueDeclare(multiQueue, durable, exclusive, autoDelete, null);
293                channel.queueBind(multiQueue, multiExchange, "");
294
295                // Declare a new consumer
296                consumer = new QueueingConsumer(channel);
297                channel.basicConsume(queue, true, consumer);
298                channel.basicConsume(multiQueue, true, consumer);
299        }
300
301}
Note: See TracBrowser for help on using the repository browser.