source: branches/supervisor/src/main/java/omq/server/RemoteObject.java @ 92

Last change on this file since 92 was 92, checked in by stoda, 11 years ago

TODO: delete in supervisor
check the code

File size: 9.5 KB
RevLine 
[44]1package omq.server;
2
3import java.io.IOException;
4import java.lang.reflect.Method;
5import java.util.ArrayList;
6import java.util.HashMap;
7import java.util.List;
8import java.util.Map;
9import java.util.Properties;
10
11import omq.Remote;
12import omq.common.broker.Broker;
13import omq.common.util.ParameterQueue;
14import omq.exception.SerializerException;
15
[72]16import org.apache.log4j.Logger;
17
[44]18import com.rabbitmq.client.Channel;
19import com.rabbitmq.client.ConsumerCancelledException;
20import com.rabbitmq.client.QueueingConsumer;
21import com.rabbitmq.client.QueueingConsumer.Delivery;
22import com.rabbitmq.client.ShutdownSignalException;
23
24/**
[83]25 * A RemoteObject when it's started will be waiting for requests and will invoke
26 * them. When a RemoteObject is started it listens two queues, the first one has
27 * the same name as its reference and the second one is its multiqueue -this
28 * name can be set using a property, be aware to use a name not used by another
29 * object!!!-.
[44]30 *
31 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
32 *
33 */
34public abstract class RemoteObject extends Thread implements Remote {
35
36        private static final long serialVersionUID = -1778953938739846450L;
[54]37        private static final String multi = "multi#";
[49]38        private static final Logger logger = Logger.getLogger(RemoteObject.class.getName());
[44]39
[92]40        private String UID;
41        private Properties env;
42        private transient Broker broker;
43        private transient String multiQueue;
44        private transient RemoteWrapper remoteWrapper;
45        private transient Map<String, List<Class<?>>> params;
46        private transient Channel channel;
47        private transient QueueingConsumer consumer;
48        private transient boolean killed = false;
[44]49
50        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
51
52        static {
53                primitiveClasses.put("byte", Byte.class);
54                primitiveClasses.put("short", Short.class);
55                primitiveClasses.put("char", Character.class);
56                primitiveClasses.put("int", Integer.class);
57                primitiveClasses.put("long", Long.class);
58                primitiveClasses.put("float", Float.class);
59                primitiveClasses.put("double", Double.class);
60        }
61
62        public RemoteObject() {
63        }
64
[83]65        /**
66         * This method starts a remoteObject.
67         *
68         * @param reference
69         *            - broker's binding referece
70         * @param broker
71         *            - broker that binds this remoteObject
72         * @param env
73         *            - properties of this remoteObject
74         * @throws Exception
75         */
[74]76        public void startRemoteObject(String reference, Broker broker, Properties env) throws Exception {
[53]77                this.broker = broker;
[74]78                this.UID = reference;
79                this.env = env;
[44]80
[74]81                this.params = new HashMap<String, List<Class<?>>>();
[44]82                for (Method m : this.getClass().getMethods()) {
83                        List<Class<?>> list = new ArrayList<Class<?>>();
84                        for (Class<?> clazz : m.getParameterTypes()) {
85                                list.add(clazz);
86                        }
[74]87                        this.params.put(m.getName(), list);
[44]88                }
89
90                // Get num threads to use
91                int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
[74]92                this.remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer());
[44]93
94                startQueues();
95
96                // Start this listener
97                this.start();
98        }
99
100        @Override
101        public void run() {
102                while (!killed) {
103                        try {
104                                Delivery delivery = consumer.nextDelivery();
[49]105
[75]106                                logger.debug(UID + " has received a message, serializer: " + delivery.getProperties().getType());
[49]107
[44]108                                remoteWrapper.notifyDelivery(delivery);
109                        } catch (InterruptedException i) {
[49]110                                logger.error(i);
[44]111                        } catch (ShutdownSignalException e) {
[49]112                                logger.error(e);
[44]113                                try {
114                                        if (channel.isOpen()) {
115                                                channel.close();
116                                        }
117                                        startQueues();
118                                } catch (Exception e1) {
119                                        try {
120                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
121                                                Thread.sleep(milis);
122                                        } catch (InterruptedException e2) {
[49]123                                                logger.error(e2);
[44]124                                        }
[49]125                                        logger.error(e1);
[44]126                                }
127                        } catch (ConsumerCancelledException e) {
[49]128                                logger.error(e);
[44]129                        } catch (SerializerException e) {
[49]130                                logger.error(e);
[44]131                        } catch (Exception e) {
[49]132                                logger.error(e);
[44]133                        }
134                }
135        }
136
137        @Override
138        public String getRef() {
139                return UID;
140        }
141
[83]142        /**
143         * This method kills all the threads waiting for requests
144         *
145         * @throws IOException
146         *             - If an operation failed.
147         */
[44]148        public void kill() throws IOException {
[49]149                logger.warn("Killing objectmq: " + this.getRef());
[44]150                killed = true;
151                interrupt();
152                channel.close();
153                remoteWrapper.stopRemoteWrapper();
154        }
155
[83]156        /**
157         * This method invokes the method specified by methodName and arguments
158         *
159         * @param methodName
160         * @param arguments
161         * @return result
162         * @throws Exception
163         */
[44]164        public Object invokeMethod(String methodName, Object[] arguments) throws Exception {
165
166                // Get the specific method identified by methodName and its arguments
167                Method method = loadMethod(methodName, arguments);
168
169                return method.invoke(this, arguments);
170        }
171
[83]172        /**
173         * This method loads the method specified by methodName and args
174         *
175         * @param methodName
176         * @param args
177         * @return method
178         * @throws NoSuchMethodException
179         *             - If the method cannot be found
180         */
[44]181        private Method loadMethod(String methodName, Object[] args) throws NoSuchMethodException {
182                Method m = null;
183
184                // Obtain the class reference
185                Class<?> clazz = this.getClass();
186                Class<?>[] argArray = null;
187
188                if (args != null) {
189                        argArray = new Class<?>[args.length];
190                        for (int i = 0; i < args.length; i++) {
191                                argArray[i] = args[i].getClass();
192                        }
193                }
194
195                try {
196                        m = clazz.getMethod(methodName, argArray);
197                } catch (NoSuchMethodException nsm) {
198                        m = loadMethodWithPrimitives(methodName, argArray);
199                }
200                return m;
201        }
202
[83]203        /**
204         * This method loads a method which uses primitives as arguments
205         *
206         * @param methodName
207         *            - name of the method wanted to invoke
208         * @param argArray
209         *            - arguments
210         * @return method
211         * @throws NoSuchMethodException
212         *             - If the method cannot be found
213         */
[44]214        private Method loadMethodWithPrimitives(String methodName, Class<?>[] argArray) throws NoSuchMethodException {
215                if (argArray != null) {
216                        Method[] methods = this.getClass().getMethods();
217                        int length = argArray.length;
218
219                        for (Method method : methods) {
220                                String name = method.getName();
221                                int argsLength = method.getParameterTypes().length;
222
223                                if (name.equals(methodName) && length == argsLength) {
224                                        // This array can have primitive types inside
225                                        Class<?>[] params = method.getParameterTypes();
226
227                                        boolean found = true;
228
229                                        for (int i = 0; i < length; i++) {
230                                                if (params[i].isPrimitive()) {
231                                                        Class<?> paramWrapper = primitiveClasses.get(params[i].getName());
232
233                                                        if (!paramWrapper.equals(argArray[i])) {
234                                                                found = false;
235                                                                break;
236                                                        }
237                                                }
238                                        }
239                                        if (found) {
240                                                return method;
241                                        }
242                                }
243                        }
244                }
245                throw new NoSuchMethodException(methodName);
246        }
247
248        public List<Class<?>> getParams(String methodName) {
249                return params.get(methodName);
250        }
251
252        public Channel getChannel() {
253                return channel;
254        }
255
[92]256        public Broker getBroker() {
257                return broker;
258        }
259
[83]260        /**
261         * This method starts the queues using the information got in the
262         * environment.
263         *
264         * @throws Exception
265         */
[44]266        private void startQueues() throws Exception {
[84]267                // Start channel
268                channel = broker.getNewChannel();
269
270                /*
271                 * Default queue, Round Robin behaviour
272                 */
273
[44]274                // Get info about which exchange and queue will use
[84]275                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
[44]276                String queue = UID;
277                String routingKey = UID;
[77]278
[84]279                // RemoteObject default queue
280                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
[77]281                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
282                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
[44]283
284                // Declares and bindings
[84]285                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
286                        channel.exchangeDeclare(exchange, "direct");
287                }
288                channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
289                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
290                        channel.queueBind(queue, exchange, routingKey);
291                }
292                logger.info("RemoteObject: " + UID + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
[77]293                                + exclusive + ", AutoDelete: " + autoDelete);
[44]294
[84]295                /*
296                 * Multi queue, exclusive per each instance
297                 */
298
299                // Get info about the multiQueue
300                String multiExchange = multi + UID;
301                multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
302
303                // Multi queue (exclusive queue per remoteObject)
304                boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
305                boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
306                boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
307
308                // Declares and bindings
[55]309                channel.exchangeDeclare(multiExchange, "fanout");
[84]310                if (multiQueue == null) {
311                        multiQueue = channel.queueDeclare().getQueue();
312                } else {
313                        channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
314                }
[55]315                channel.queueBind(multiQueue, multiExchange, "");
[84]316                logger.info("RemoteObject: " + UID + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable
317                                + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
[55]318
[84]319                /*
320                 * Consumer
321                 */
322
[91]323                boolean autoAck = false;
324
[92]325                //TODO see if this is useless
326                int prefetchCount = 1;
327                channel.basicQos(prefetchCount);
328
[44]329                // Declare a new consumer
330                consumer = new QueueingConsumer(channel);
[91]331                channel.basicConsume(queue, autoAck, consumer);
332                channel.basicConsume(multiQueue, autoAck, consumer);
[44]333        }
334
335}
Note: See TracBrowser for help on using the repository browser.