source: trunk/src/main/java/omq/server/RemoteObject.java @ 53

Last change on this file since 53 was 53, checked in by stoda, 11 years ago

Non static broker
TODO: change all test to see whether the new broker configuration works

File size: 6.9 KB
Line 
1package omq.server;
2
3import java.io.IOException;
4import java.lang.reflect.Method;
5import java.util.ArrayList;
6import java.util.Collection;
7import java.util.HashMap;
8import java.util.List;
9import java.util.Map;
10import java.util.Properties;
11
12import org.apache.log4j.Logger;
13
14import omq.Remote;
15import omq.common.broker.Broker;
16import omq.common.event.Event;
17import omq.common.event.EventListener;
18import omq.common.event.EventWrapper;
19import omq.common.util.ParameterQueue;
20import omq.common.util.Serializer;
21import omq.exception.SerializerException;
22
23import com.rabbitmq.client.Channel;
24import com.rabbitmq.client.ConsumerCancelledException;
25import com.rabbitmq.client.QueueingConsumer;
26import com.rabbitmq.client.QueueingConsumer.Delivery;
27import com.rabbitmq.client.ShutdownSignalException;
28
29/**
30 *
31 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
32 *
33 */
34public abstract class RemoteObject extends Thread implements Remote {
35
36        private static final long serialVersionUID = -1778953938739846450L;
37        private static final Logger logger = Logger.getLogger(RemoteObject.class.getName());
38
39        private String UID;
40        private Properties env;
41        private transient Broker broker;
42        private transient Serializer serializer;
43        private transient RemoteWrapper remoteWrapper;
44        private transient Map<String, List<Class<?>>> params;
45        private transient Channel channel;
46        private transient QueueingConsumer consumer;
47        private transient boolean killed = false;
48
49        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
50
51        static {
52                primitiveClasses.put("byte", Byte.class);
53                primitiveClasses.put("short", Short.class);
54                primitiveClasses.put("char", Character.class);
55                primitiveClasses.put("int", Integer.class);
56                primitiveClasses.put("long", Long.class);
57                primitiveClasses.put("float", Float.class);
58                primitiveClasses.put("double", Double.class);
59        }
60
61        public RemoteObject() {
62        }
63
64        public void startRemoteObject(String reference, Broker broker) throws Exception {
65                this.broker = broker;
66                UID = reference;
67                env = broker.getEnvironment();
68                serializer = broker.getSerializer();
69
70                params = new HashMap<String, List<Class<?>>>();
71                for (Method m : this.getClass().getMethods()) {
72                        List<Class<?>> list = new ArrayList<Class<?>>();
73                        for (Class<?> clazz : m.getParameterTypes()) {
74                                list.add(clazz);
75                        }
76                        params.put(m.getName(), list);
77                }
78
79                // Get num threads to use
80                int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
81                remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer());
82
83                startQueues();
84
85                // Start this listener
86                this.start();
87        }
88
89        @Override
90        public void run() {
91                while (!killed) {
92                        try {
93                                Delivery delivery = consumer.nextDelivery();
94
95                                logger.debug(UID + " has received a message");
96
97                                remoteWrapper.notifyDelivery(delivery);
98                        } catch (InterruptedException i) {
99                                logger.error(i);
100                        } catch (ShutdownSignalException e) {
101                                logger.error(e);
102                                try {
103                                        if (channel.isOpen()) {
104                                                channel.close();
105                                        }
106                                        startQueues();
107                                } catch (Exception e1) {
108                                        try {
109                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
110                                                Thread.sleep(milis);
111                                        } catch (InterruptedException e2) {
112                                                logger.error(e2);
113                                        }
114                                        logger.error(e1);
115                                }
116                        } catch (ConsumerCancelledException e) {
117                                logger.error(e);
118                        } catch (SerializerException e) {
119                                logger.error(e);
120                        } catch (Exception e) {
121                                logger.error(e);
122                        }
123                }
124        }
125
126        @Override
127        public String getRef() {
128                return UID;
129        }
130
131        @Override
132        public void notifyEvent(Event event) throws IOException, SerializerException {
133                event.setTopic(UID);
134                EventWrapper wrapper = new EventWrapper(event);
135                channel.exchangeDeclare(UID, "fanout");
136                channel.basicPublish(UID, "", null, serializer.serialize(wrapper));
137        }
138
139        public void kill() throws IOException {
140                logger.warn("Killing objectmq: " + this.getRef());
141                killed = true;
142                interrupt();
143                channel.close();
144                remoteWrapper.stopRemoteWrapper();
145        }
146
147        public Object invokeMethod(String methodName, Object[] arguments) throws Exception {
148
149                // Get the specific method identified by methodName and its arguments
150                Method method = loadMethod(methodName, arguments);
151
152                return method.invoke(this, arguments);
153        }
154
155        private Method loadMethod(String methodName, Object[] args) throws NoSuchMethodException {
156                Method m = null;
157
158                // Obtain the class reference
159                Class<?> clazz = this.getClass();
160                Class<?>[] argArray = null;
161
162                if (args != null) {
163                        argArray = new Class<?>[args.length];
164                        for (int i = 0; i < args.length; i++) {
165                                argArray[i] = args[i].getClass();
166                        }
167                }
168
169                try {
170                        m = clazz.getMethod(methodName, argArray);
171                } catch (NoSuchMethodException nsm) {
172                        m = loadMethodWithPrimitives(methodName, argArray);
173                }
174                return m;
175        }
176
177        private Method loadMethodWithPrimitives(String methodName, Class<?>[] argArray) throws NoSuchMethodException {
178                if (argArray != null) {
179                        Method[] methods = this.getClass().getMethods();
180                        int length = argArray.length;
181
182                        for (Method method : methods) {
183                                String name = method.getName();
184                                int argsLength = method.getParameterTypes().length;
185
186                                if (name.equals(methodName) && length == argsLength) {
187                                        // This array can have primitive types inside
188                                        Class<?>[] params = method.getParameterTypes();
189
190                                        boolean found = true;
191
192                                        for (int i = 0; i < length; i++) {
193                                                if (params[i].isPrimitive()) {
194                                                        Class<?> paramWrapper = primitiveClasses.get(params[i].getName());
195
196                                                        if (!paramWrapper.equals(argArray[i])) {
197                                                                found = false;
198                                                                break;
199                                                        }
200                                                }
201                                        }
202                                        if (found) {
203                                                return method;
204                                        }
205                                }
206                        }
207                }
208                throw new NoSuchMethodException(methodName);
209        }
210
211        public List<Class<?>> getParams(String methodName) {
212                return params.get(methodName);
213        }
214
215        public Channel getChannel() {
216                return channel;
217        }
218
219        private void startQueues() throws Exception {
220                // Get info about which exchange and queue will use
221                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
222                String queue = UID;
223                String routingKey = UID;
224                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
225
226                // Start channel
227                channel = broker.getNewChannel();
228
229                // Declares and bindings
230                logger.info("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue);
231                channel.exchangeDeclare(exchange, "direct");
232                channel.queueDeclare(queue, durable, false, false, null);
233                channel.queueBind(queue, exchange, routingKey);
234
235                // Declare the event topic fanout
236                logger.info("RemoteObject: " + UID + " declaring fanout exchange: " + UID);
237                channel.exchangeDeclare(UID, "fanout");
238
239                // Declare a new consumer
240                consumer = new QueueingConsumer(channel);
241                channel.basicConsume(queue, true, consumer);
242        }
243
244        @Override
245        public void addListener(EventListener<?> eventListener) throws Exception {
246        }
247
248        @Override
249        public void removeListener(EventListener<?> eventListener) throws Exception {
250        }
251
252        @Override
253        public Collection<EventListener<?>> getListeners() throws Exception {
254                return null;
255        }
256
257}
Note: See TracBrowser for help on using the repository browser.