source: trunk/src/main/java/omq/server/RemoteObject.java @ 54

Last change on this file since 54 was 54, checked in by stoda, 11 years ago

Adding @MultiMethod?
Broker is not a singleton.

File size: 7.1 KB
Line 
1package omq.server;
2
3import java.io.IOException;
4import java.lang.reflect.Method;
5import java.util.ArrayList;
6import java.util.Collection;
7import java.util.HashMap;
8import java.util.List;
9import java.util.Map;
10import java.util.Properties;
11
12import org.apache.log4j.Logger;
13
14import omq.Remote;
15import omq.common.broker.Broker;
16import omq.common.event.Event;
17import omq.common.event.EventListener;
18import omq.common.event.EventWrapper;
19import omq.common.util.ParameterQueue;
20import omq.common.util.Serializer;
21import omq.exception.SerializerException;
22
23import com.rabbitmq.client.Channel;
24import com.rabbitmq.client.ConsumerCancelledException;
25import com.rabbitmq.client.QueueingConsumer;
26import com.rabbitmq.client.QueueingConsumer.Delivery;
27import com.rabbitmq.client.ShutdownSignalException;
28
29/**
30 *
31 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
32 *
33 */
34public abstract class RemoteObject extends Thread implements Remote {
35
36        private static final long serialVersionUID = -1778953938739846450L;
37        private static final String multi = "multi#";
38        private static final Logger logger = Logger.getLogger(RemoteObject.class.getName());
39
40        private String UID;
41        private Properties env;
42        private transient Broker broker;
43        private transient Serializer serializer;
44        private transient RemoteWrapper remoteWrapper;
45        private transient Map<String, List<Class<?>>> params;
46        private transient Channel channel;
47        private transient QueueingConsumer consumer;
48        private transient boolean killed = false;
49
50        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
51
52        static {
53                primitiveClasses.put("byte", Byte.class);
54                primitiveClasses.put("short", Short.class);
55                primitiveClasses.put("char", Character.class);
56                primitiveClasses.put("int", Integer.class);
57                primitiveClasses.put("long", Long.class);
58                primitiveClasses.put("float", Float.class);
59                primitiveClasses.put("double", Double.class);
60        }
61
62        public RemoteObject() {
63        }
64
65        public void startRemoteObject(String reference, Broker broker) throws Exception {
66                this.broker = broker;
67                UID = reference;
68                env = broker.getEnvironment();
69                serializer = broker.getSerializer();
70
71                params = new HashMap<String, List<Class<?>>>();
72                for (Method m : this.getClass().getMethods()) {
73                        List<Class<?>> list = new ArrayList<Class<?>>();
74                        for (Class<?> clazz : m.getParameterTypes()) {
75                                list.add(clazz);
76                        }
77                        params.put(m.getName(), list);
78                }
79
80                // Get num threads to use
81                int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
82                remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer());
83
84                startQueues();
85
86                // Start this listener
87                this.start();
88        }
89
90        @Override
91        public void run() {
92                while (!killed) {
93                        try {
94                                Delivery delivery = consumer.nextDelivery();
95
96                                logger.debug(UID + " has received a message");
97
98                                remoteWrapper.notifyDelivery(delivery);
99                        } catch (InterruptedException i) {
100                                logger.error(i);
101                        } catch (ShutdownSignalException e) {
102                                logger.error(e);
103                                try {
104                                        if (channel.isOpen()) {
105                                                channel.close();
106                                        }
107                                        startQueues();
108                                } catch (Exception e1) {
109                                        try {
110                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
111                                                Thread.sleep(milis);
112                                        } catch (InterruptedException e2) {
113                                                logger.error(e2);
114                                        }
115                                        logger.error(e1);
116                                }
117                        } catch (ConsumerCancelledException e) {
118                                logger.error(e);
119                        } catch (SerializerException e) {
120                                logger.error(e);
121                        } catch (Exception e) {
122                                logger.error(e);
123                        }
124                }
125        }
126
127        @Override
128        public String getRef() {
129                return UID;
130        }
131
132        @Override
133        public void notifyEvent(Event event) throws IOException, SerializerException {
134                event.setTopic(UID);
135                EventWrapper wrapper = new EventWrapper(event);
136                channel.exchangeDeclare(UID, "fanout");
137                channel.basicPublish(UID, "", null, serializer.serialize(wrapper));
138        }
139
140        public void kill() throws IOException {
141                logger.warn("Killing objectmq: " + this.getRef());
142                killed = true;
143                interrupt();
144                channel.close();
145                remoteWrapper.stopRemoteWrapper();
146        }
147
148        public Object invokeMethod(String methodName, Object[] arguments) throws Exception {
149
150                // Get the specific method identified by methodName and its arguments
151                Method method = loadMethod(methodName, arguments);
152
153                return method.invoke(this, arguments);
154        }
155
156        private Method loadMethod(String methodName, Object[] args) throws NoSuchMethodException {
157                Method m = null;
158
159                // Obtain the class reference
160                Class<?> clazz = this.getClass();
161                Class<?>[] argArray = null;
162
163                if (args != null) {
164                        argArray = new Class<?>[args.length];
165                        for (int i = 0; i < args.length; i++) {
166                                argArray[i] = args[i].getClass();
167                        }
168                }
169
170                try {
171                        m = clazz.getMethod(methodName, argArray);
172                } catch (NoSuchMethodException nsm) {
173                        m = loadMethodWithPrimitives(methodName, argArray);
174                }
175                return m;
176        }
177
178        private Method loadMethodWithPrimitives(String methodName, Class<?>[] argArray) throws NoSuchMethodException {
179                if (argArray != null) {
180                        Method[] methods = this.getClass().getMethods();
181                        int length = argArray.length;
182
183                        for (Method method : methods) {
184                                String name = method.getName();
185                                int argsLength = method.getParameterTypes().length;
186
187                                if (name.equals(methodName) && length == argsLength) {
188                                        // This array can have primitive types inside
189                                        Class<?>[] params = method.getParameterTypes();
190
191                                        boolean found = true;
192
193                                        for (int i = 0; i < length; i++) {
194                                                if (params[i].isPrimitive()) {
195                                                        Class<?> paramWrapper = primitiveClasses.get(params[i].getName());
196
197                                                        if (!paramWrapper.equals(argArray[i])) {
198                                                                found = false;
199                                                                break;
200                                                        }
201                                                }
202                                        }
203                                        if (found) {
204                                                return method;
205                                        }
206                                }
207                        }
208                }
209                throw new NoSuchMethodException(methodName);
210        }
211
212        public List<Class<?>> getParams(String methodName) {
213                return params.get(methodName);
214        }
215
216        public Channel getChannel() {
217                return channel;
218        }
219
220        private void startQueues() throws Exception {
221                // Get info about which exchange and queue will use
222                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
223                String queue = UID;
224                String routingKey = UID;
225                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
226
227                // Start channel
228                channel = broker.getNewChannel();
229
230                // Declares and bindings
231                logger.info("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue);
232                channel.exchangeDeclare(exchange, "direct");
233                channel.exchangeDeclare(multi + exchange, "fanout");
234                channel.queueDeclare(queue, durable, false, false, null);
235                channel.queueBind(queue, exchange, routingKey);
236                channel.queueBind(queue, multi + exchange, routingKey);
237
238                // Declare the event topic fanout
239                logger.info("RemoteObject: " + UID + " declaring fanout exchange: " + UID);
240                channel.exchangeDeclare(UID, "fanout");
241
242                // Declare a new consumer
243                consumer = new QueueingConsumer(channel);
244                channel.basicConsume(queue, true, consumer);
245        }
246
247        @Override
248        public void addListener(EventListener<?> eventListener) throws Exception {
249        }
250
251        @Override
252        public void removeListener(EventListener<?> eventListener) throws Exception {
253        }
254
255        @Override
256        public Collection<EventListener<?>> getListeners() throws Exception {
257                return null;
258        }
259
260}
Note: See TracBrowser for help on using the repository browser.