source: branches/supervisor/src/main/java/omq/server/RemoteObject.java @ 90

Last change on this file since 90 was 84, checked in by stoda, 11 years ago

Default queues added, default exchange enabled, more control in remote queues added.
Tests verified and changed Persistent test to show how to make persistent messages.

File size: 9.3 KB
Line 
1package omq.server;
2
3import java.io.IOException;
4import java.lang.reflect.Method;
5import java.util.ArrayList;
6import java.util.HashMap;
7import java.util.List;
8import java.util.Map;
9import java.util.Properties;
10
11import omq.Remote;
12import omq.common.broker.Broker;
13import omq.common.util.ParameterQueue;
14import omq.exception.SerializerException;
15
16import org.apache.log4j.Logger;
17
18import com.rabbitmq.client.Channel;
19import com.rabbitmq.client.ConsumerCancelledException;
20import com.rabbitmq.client.QueueingConsumer;
21import com.rabbitmq.client.QueueingConsumer.Delivery;
22import com.rabbitmq.client.ShutdownSignalException;
23
24/**
25 * A RemoteObject when it's started will be waiting for requests and will invoke
26 * them. When a RemoteObject is started it listens two queues, the first one has
27 * the same name as its reference and the second one is its multiqueue -this
28 * name can be set using a property, be aware to use a name not used by another
29 * object!!!-.
30 *
31 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
32 *
33 */
34public abstract class RemoteObject extends Thread implements Remote {
35
36        private static final long serialVersionUID = -1778953938739846450L;
37        private static final String multi = "multi#";
38        private static final Logger logger = Logger.getLogger(RemoteObject.class.getName());
39
40        private String UID;
41        private Properties env;
42        private transient Broker broker;
43        private transient String multiQueue;
44        private transient RemoteWrapper remoteWrapper;
45        private transient Map<String, List<Class<?>>> params;
46        private transient Channel channel;
47        private transient QueueingConsumer consumer;
48        private transient boolean killed = false;
49
50        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
51
52        static {
53                primitiveClasses.put("byte", Byte.class);
54                primitiveClasses.put("short", Short.class);
55                primitiveClasses.put("char", Character.class);
56                primitiveClasses.put("int", Integer.class);
57                primitiveClasses.put("long", Long.class);
58                primitiveClasses.put("float", Float.class);
59                primitiveClasses.put("double", Double.class);
60        }
61
62        public RemoteObject() {
63        }
64
65        /**
66         * This method starts a remoteObject.
67         *
68         * @param reference
69         *            - broker's binding referece
70         * @param broker
71         *            - broker that binds this remoteObject
72         * @param env
73         *            - properties of this remoteObject
74         * @throws Exception
75         */
76        public void startRemoteObject(String reference, Broker broker, Properties env) throws Exception {
77                this.broker = broker;
78                this.UID = reference;
79                this.env = env;
80
81                this.params = new HashMap<String, List<Class<?>>>();
82                for (Method m : this.getClass().getMethods()) {
83                        List<Class<?>> list = new ArrayList<Class<?>>();
84                        for (Class<?> clazz : m.getParameterTypes()) {
85                                list.add(clazz);
86                        }
87                        this.params.put(m.getName(), list);
88                }
89
90                // Get num threads to use
91                int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
92                this.remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer());
93
94                startQueues();
95
96                // Start this listener
97                this.start();
98        }
99
100        @Override
101        public void run() {
102                while (!killed) {
103                        try {
104                                Delivery delivery = consumer.nextDelivery();
105
106                                logger.debug(UID + " has received a message, serializer: " + delivery.getProperties().getType());
107
108                                remoteWrapper.notifyDelivery(delivery);
109                        } catch (InterruptedException i) {
110                                logger.error(i);
111                        } catch (ShutdownSignalException e) {
112                                logger.error(e);
113                                try {
114                                        if (channel.isOpen()) {
115                                                channel.close();
116                                        }
117                                        startQueues();
118                                } catch (Exception e1) {
119                                        try {
120                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
121                                                Thread.sleep(milis);
122                                        } catch (InterruptedException e2) {
123                                                logger.error(e2);
124                                        }
125                                        logger.error(e1);
126                                }
127                        } catch (ConsumerCancelledException e) {
128                                logger.error(e);
129                        } catch (SerializerException e) {
130                                logger.error(e);
131                        } catch (Exception e) {
132                                logger.error(e);
133                        }
134                }
135        }
136
137        @Override
138        public String getRef() {
139                return UID;
140        }
141
142        /**
143         * This method kills all the threads waiting for requests
144         *
145         * @throws IOException
146         *             - If an operation failed.
147         */
148        public void kill() throws IOException {
149                logger.warn("Killing objectmq: " + this.getRef());
150                killed = true;
151                interrupt();
152                channel.close();
153                remoteWrapper.stopRemoteWrapper();
154        }
155
156        /**
157         * This method invokes the method specified by methodName and arguments
158         *
159         * @param methodName
160         * @param arguments
161         * @return result
162         * @throws Exception
163         */
164        public Object invokeMethod(String methodName, Object[] arguments) throws Exception {
165
166                // Get the specific method identified by methodName and its arguments
167                Method method = loadMethod(methodName, arguments);
168
169                return method.invoke(this, arguments);
170        }
171
172        /**
173         * This method loads the method specified by methodName and args
174         *
175         * @param methodName
176         * @param args
177         * @return method
178         * @throws NoSuchMethodException
179         *             - If the method cannot be found
180         */
181        private Method loadMethod(String methodName, Object[] args) throws NoSuchMethodException {
182                Method m = null;
183
184                // Obtain the class reference
185                Class<?> clazz = this.getClass();
186                Class<?>[] argArray = null;
187
188                if (args != null) {
189                        argArray = new Class<?>[args.length];
190                        for (int i = 0; i < args.length; i++) {
191                                argArray[i] = args[i].getClass();
192                        }
193                }
194
195                try {
196                        m = clazz.getMethod(methodName, argArray);
197                } catch (NoSuchMethodException nsm) {
198                        m = loadMethodWithPrimitives(methodName, argArray);
199                }
200                return m;
201        }
202
203        /**
204         * This method loads a method which uses primitives as arguments
205         *
206         * @param methodName
207         *            - name of the method wanted to invoke
208         * @param argArray
209         *            - arguments
210         * @return method
211         * @throws NoSuchMethodException
212         *             - If the method cannot be found
213         */
214        private Method loadMethodWithPrimitives(String methodName, Class<?>[] argArray) throws NoSuchMethodException {
215                if (argArray != null) {
216                        Method[] methods = this.getClass().getMethods();
217                        int length = argArray.length;
218
219                        for (Method method : methods) {
220                                String name = method.getName();
221                                int argsLength = method.getParameterTypes().length;
222
223                                if (name.equals(methodName) && length == argsLength) {
224                                        // This array can have primitive types inside
225                                        Class<?>[] params = method.getParameterTypes();
226
227                                        boolean found = true;
228
229                                        for (int i = 0; i < length; i++) {
230                                                if (params[i].isPrimitive()) {
231                                                        Class<?> paramWrapper = primitiveClasses.get(params[i].getName());
232
233                                                        if (!paramWrapper.equals(argArray[i])) {
234                                                                found = false;
235                                                                break;
236                                                        }
237                                                }
238                                        }
239                                        if (found) {
240                                                return method;
241                                        }
242                                }
243                        }
244                }
245                throw new NoSuchMethodException(methodName);
246        }
247
248        public List<Class<?>> getParams(String methodName) {
249                return params.get(methodName);
250        }
251
252        public Channel getChannel() {
253                return channel;
254        }
255
256        /**
257         * This method starts the queues using the information got in the
258         * environment.
259         *
260         * @throws Exception
261         */
262        private void startQueues() throws Exception {
263                // Start channel
264                channel = broker.getNewChannel();
265
266                /*
267                 * Default queue, Round Robin behaviour
268                 */
269
270                // Get info about which exchange and queue will use
271                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
272                String queue = UID;
273                String routingKey = UID;
274
275                // RemoteObject default queue
276                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
277                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
278                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
279
280                // Declares and bindings
281                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
282                        channel.exchangeDeclare(exchange, "direct");
283                }
284                channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
285                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
286                        channel.queueBind(queue, exchange, routingKey);
287                }
288                logger.info("RemoteObject: " + UID + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
289                                + exclusive + ", AutoDelete: " + autoDelete);
290
291                /*
292                 * Multi queue, exclusive per each instance
293                 */
294
295                // Get info about the multiQueue
296                String multiExchange = multi + UID;
297                multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
298
299                // Multi queue (exclusive queue per remoteObject)
300                boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
301                boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
302                boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
303
304                // Declares and bindings
305                channel.exchangeDeclare(multiExchange, "fanout");
306                if (multiQueue == null) {
307                        multiQueue = channel.queueDeclare().getQueue();
308                } else {
309                        channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
310                }
311                channel.queueBind(multiQueue, multiExchange, "");
312                logger.info("RemoteObject: " + UID + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable
313                                + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
314
315                /*
316                 * Consumer
317                 */
318
319                // Declare a new consumer
320                consumer = new QueueingConsumer(channel);
321                channel.basicConsume(queue, true, consumer);
322                channel.basicConsume(multiQueue, true, consumer);
323        }
324
325}
Note: See TracBrowser for help on using the repository browser.