source: trunk/src/main/java/omq/server/RemoteObject.java @ 80

Last change on this file since 80 was 77, checked in by stoda, 11 years ago

ParameterQueues? changed, added some properties to modify the queues

File size: 7.3 KB
RevLine 
[44]1package omq.server;
2
3import java.io.IOException;
4import java.lang.reflect.Method;
5import java.util.ArrayList;
6import java.util.HashMap;
7import java.util.List;
8import java.util.Map;
9import java.util.Properties;
10
11import omq.Remote;
12import omq.common.broker.Broker;
13import omq.common.util.ParameterQueue;
14import omq.exception.SerializerException;
15
[72]16import org.apache.log4j.Logger;
17
[44]18import com.rabbitmq.client.Channel;
19import com.rabbitmq.client.ConsumerCancelledException;
20import com.rabbitmq.client.QueueingConsumer;
21import com.rabbitmq.client.QueueingConsumer.Delivery;
22import com.rabbitmq.client.ShutdownSignalException;
23
24/**
25 *
26 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
27 *
28 */
29public abstract class RemoteObject extends Thread implements Remote {
30
31        private static final long serialVersionUID = -1778953938739846450L;
[54]32        private static final String multi = "multi#";
[49]33        private static final Logger logger = Logger.getLogger(RemoteObject.class.getName());
[44]34
35        private String UID;
[58]36        private String multiQueue;
[44]37        private Properties env;
[53]38        private transient Broker broker;
[44]39        private transient RemoteWrapper remoteWrapper;
40        private transient Map<String, List<Class<?>>> params;
41        private transient Channel channel;
42        private transient QueueingConsumer consumer;
43        private transient boolean killed = false;
44
45        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
46
47        static {
48                primitiveClasses.put("byte", Byte.class);
49                primitiveClasses.put("short", Short.class);
50                primitiveClasses.put("char", Character.class);
51                primitiveClasses.put("int", Integer.class);
52                primitiveClasses.put("long", Long.class);
53                primitiveClasses.put("float", Float.class);
54                primitiveClasses.put("double", Double.class);
55        }
56
57        public RemoteObject() {
58        }
59
[74]60        public void startRemoteObject(String reference, Broker broker, Properties env) throws Exception {
[53]61                this.broker = broker;
[74]62                this.UID = reference;
63                this.multiQueue = UID + System.currentTimeMillis();
64                this.env = env;
[44]65
[74]66                this.params = new HashMap<String, List<Class<?>>>();
[44]67                for (Method m : this.getClass().getMethods()) {
68                        List<Class<?>> list = new ArrayList<Class<?>>();
69                        for (Class<?> clazz : m.getParameterTypes()) {
70                                list.add(clazz);
71                        }
[74]72                        this.params.put(m.getName(), list);
[44]73                }
74
75                // Get num threads to use
76                int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
[74]77                this.remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer());
[44]78
79                startQueues();
80
81                // Start this listener
82                this.start();
83        }
84
[66]85        public void startTriggerEvent(String reference, Broker broker) throws Exception {
86                this.broker = broker;
87                UID = reference;
88                if (channel == null || !channel.isOpen()) {
89                        channel = broker.getChannel();
90                }
91        }
92
[44]93        @Override
94        public void run() {
95                while (!killed) {
96                        try {
97                                Delivery delivery = consumer.nextDelivery();
[49]98
[75]99                                logger.debug(UID + " has received a message, serializer: " + delivery.getProperties().getType());
[49]100
[44]101                                remoteWrapper.notifyDelivery(delivery);
102                        } catch (InterruptedException i) {
[49]103                                logger.error(i);
[44]104                        } catch (ShutdownSignalException e) {
[49]105                                logger.error(e);
[44]106                                try {
107                                        if (channel.isOpen()) {
108                                                channel.close();
109                                        }
110                                        startQueues();
111                                } catch (Exception e1) {
112                                        try {
113                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
114                                                Thread.sleep(milis);
115                                        } catch (InterruptedException e2) {
[49]116                                                logger.error(e2);
[44]117                                        }
[49]118                                        logger.error(e1);
[44]119                                }
120                        } catch (ConsumerCancelledException e) {
[49]121                                logger.error(e);
[44]122                        } catch (SerializerException e) {
[49]123                                logger.error(e);
[44]124                        } catch (Exception e) {
[49]125                                logger.error(e);
[44]126                        }
127                }
128        }
129
130        @Override
131        public String getRef() {
132                return UID;
133        }
134
135        public void kill() throws IOException {
[49]136                logger.warn("Killing objectmq: " + this.getRef());
[44]137                killed = true;
138                interrupt();
139                channel.close();
140                remoteWrapper.stopRemoteWrapper();
141        }
142
143        public Object invokeMethod(String methodName, Object[] arguments) throws Exception {
144
145                // Get the specific method identified by methodName and its arguments
146                Method method = loadMethod(methodName, arguments);
147
148                return method.invoke(this, arguments);
149        }
150
151        private Method loadMethod(String methodName, Object[] args) throws NoSuchMethodException {
152                Method m = null;
153
154                // Obtain the class reference
155                Class<?> clazz = this.getClass();
156                Class<?>[] argArray = null;
157
158                if (args != null) {
159                        argArray = new Class<?>[args.length];
160                        for (int i = 0; i < args.length; i++) {
161                                argArray[i] = args[i].getClass();
162                        }
163                }
164
165                try {
166                        m = clazz.getMethod(methodName, argArray);
167                } catch (NoSuchMethodException nsm) {
168                        m = loadMethodWithPrimitives(methodName, argArray);
169                }
170                return m;
171        }
172
173        private Method loadMethodWithPrimitives(String methodName, Class<?>[] argArray) throws NoSuchMethodException {
174                if (argArray != null) {
175                        Method[] methods = this.getClass().getMethods();
176                        int length = argArray.length;
177
178                        for (Method method : methods) {
179                                String name = method.getName();
180                                int argsLength = method.getParameterTypes().length;
181
182                                if (name.equals(methodName) && length == argsLength) {
183                                        // This array can have primitive types inside
184                                        Class<?>[] params = method.getParameterTypes();
185
186                                        boolean found = true;
187
188                                        for (int i = 0; i < length; i++) {
189                                                if (params[i].isPrimitive()) {
190                                                        Class<?> paramWrapper = primitiveClasses.get(params[i].getName());
191
192                                                        if (!paramWrapper.equals(argArray[i])) {
193                                                                found = false;
194                                                                break;
195                                                        }
196                                                }
197                                        }
198                                        if (found) {
199                                                return method;
200                                        }
201                                }
202                        }
203                }
204                throw new NoSuchMethodException(methodName);
205        }
206
207        public List<Class<?>> getParams(String methodName) {
208                return params.get(methodName);
209        }
210
211        public Channel getChannel() {
212                return channel;
213        }
214
215        private void startQueues() throws Exception {
216                // Get info about which exchange and queue will use
217                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
218                String queue = UID;
219                String routingKey = UID;
[77]220
[55]221                // Multi info
[75]222                String multiExchange = multi + UID;
[77]223                if (env.getProperty(ParameterQueue.MULTI_QUEUE_NAME) != null) {
224                        multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
225                }
[55]226
[44]227                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
[77]228                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
229                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
[44]230
231                // Start channel
[53]232                channel = broker.getNewChannel();
[44]233
234                // Declares and bindings
[77]235                logger.info("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
236                                + exclusive + ", AutoDelete: " + autoDelete);
[44]237                channel.exchangeDeclare(exchange, "direct");
[77]238                channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
[44]239                channel.queueBind(queue, exchange, routingKey);
240
[77]241                logger.info("RemoteObject: " + UID + " declaring fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + durable
242                                + ", Exclusive: " + exclusive + ", AutoDelete: " + autoDelete);
[55]243                channel.exchangeDeclare(multiExchange, "fanout");
[77]244                channel.queueDeclare(multiQueue, durable, exclusive, autoDelete, null);
[55]245                channel.queueBind(multiQueue, multiExchange, "");
246
[44]247                // Declare a new consumer
248                consumer = new QueueingConsumer(channel);
249                channel.basicConsume(queue, true, consumer);
[55]250                channel.basicConsume(multiQueue, true, consumer);
[44]251        }
252
253}
Note: See TracBrowser for help on using the repository browser.