source: trunk/src/main/java/omq/server/RemoteObject.java @ 70

Last change on this file since 70 was 67, checked in by stoda, 11 years ago

EventTest? added

File size: 7.7 KB
Line 
1package omq.server;
2
3import java.io.IOException;
4import java.lang.reflect.Method;
5import java.util.ArrayList;
6import java.util.Collection;
7import java.util.HashMap;
8import java.util.List;
9import java.util.Map;
10import java.util.Properties;
11
12import org.apache.log4j.Logger;
13
14import omq.Remote;
15import omq.common.broker.Broker;
16import omq.common.event.Event;
17import omq.common.event.EventListener;
18import omq.common.event.EventWrapper;
19import omq.common.util.ParameterQueue;
20import omq.common.util.Serializer;
21import omq.exception.SerializerException;
22
23import com.rabbitmq.client.Channel;
24import com.rabbitmq.client.ConsumerCancelledException;
25import com.rabbitmq.client.QueueingConsumer;
26import com.rabbitmq.client.QueueingConsumer.Delivery;
27import com.rabbitmq.client.ShutdownSignalException;
28
29/**
30 *
31 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
32 *
33 */
34public abstract class RemoteObject extends Thread implements Remote {
35
36        private static final long serialVersionUID = -1778953938739846450L;
37        private static final String multi = "multi#";
38        private static final Logger logger = Logger.getLogger(RemoteObject.class.getName());
39
40        private String UID;
41        private String multiQueue;
42        private Properties env;
43        private transient Broker broker;
44        private transient Serializer serializer;
45        private transient RemoteWrapper remoteWrapper;
46        private transient Map<String, List<Class<?>>> params;
47        private transient Channel channel;
48        private transient QueueingConsumer consumer;
49        private transient boolean killed = false;
50
51        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
52
53        static {
54                primitiveClasses.put("byte", Byte.class);
55                primitiveClasses.put("short", Short.class);
56                primitiveClasses.put("char", Character.class);
57                primitiveClasses.put("int", Integer.class);
58                primitiveClasses.put("long", Long.class);
59                primitiveClasses.put("float", Float.class);
60                primitiveClasses.put("double", Double.class);
61        }
62
63        public RemoteObject() {
64        }
65
66        public void startRemoteObject(String reference, Broker broker) throws Exception {
67                this.broker = broker;
68                UID = reference;
69                multiQueue = UID + System.currentTimeMillis();
70                env = broker.getEnvironment();
71                serializer = broker.getSerializer();
72
73                params = new HashMap<String, List<Class<?>>>();
74                for (Method m : this.getClass().getMethods()) {
75                        List<Class<?>> list = new ArrayList<Class<?>>();
76                        for (Class<?> clazz : m.getParameterTypes()) {
77                                list.add(clazz);
78                        }
79                        params.put(m.getName(), list);
80                }
81
82                // Get num threads to use
83                int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
84                remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer());
85
86                startQueues();
87
88                // Start this listener
89                this.start();
90        }
91
92        public void startTriggerEvent(String reference, Broker broker) throws Exception {
93                this.broker = broker;
94                UID = reference;
95                serializer = broker.getSerializer();
96                if (channel == null || !channel.isOpen()) {
97                        channel = broker.getChannel();
98                }
99        }
100
101        @Override
102        public void run() {
103                while (!killed) {
104                        try {
105                                Delivery delivery = consumer.nextDelivery();
106
107                                logger.debug(UID + " has received a message");
108
109                                remoteWrapper.notifyDelivery(delivery);
110                        } catch (InterruptedException i) {
111                                logger.error(i);
112                        } catch (ShutdownSignalException e) {
113                                logger.error(e);
114                                try {
115                                        if (channel.isOpen()) {
116                                                channel.close();
117                                        }
118                                        startQueues();
119                                } catch (Exception e1) {
120                                        try {
121                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
122                                                Thread.sleep(milis);
123                                        } catch (InterruptedException e2) {
124                                                logger.error(e2);
125                                        }
126                                        logger.error(e1);
127                                }
128                        } catch (ConsumerCancelledException e) {
129                                logger.error(e);
130                        } catch (SerializerException e) {
131                                logger.error(e);
132                        } catch (Exception e) {
133                                logger.error(e);
134                        }
135                }
136        }
137
138        @Override
139        public String getRef() {
140                return UID;
141        }
142
143        @Override
144        public void notifyEvent(Event event) throws IOException, SerializerException {
145                String corrID = java.util.UUID.randomUUID().toString();
146                event.setTopic(UID);
147                event.setCorrId(corrID);
148                EventWrapper wrapper = new EventWrapper(event);
149                channel.exchangeDeclare(UID, "fanout");
150                channel.basicPublish(UID, "", null, serializer.serialize(wrapper));
151                logger.debug("Sending event-> topic: " + UID + ", corrID: " + corrID);
152        }
153
154        public void kill() throws IOException {
155                logger.warn("Killing objectmq: " + this.getRef());
156                killed = true;
157                interrupt();
158                channel.close();
159                remoteWrapper.stopRemoteWrapper();
160        }
161
162        public Object invokeMethod(String methodName, Object[] arguments) throws Exception {
163
164                // Get the specific method identified by methodName and its arguments
165                Method method = loadMethod(methodName, arguments);
166
167                return method.invoke(this, arguments);
168        }
169
170        private Method loadMethod(String methodName, Object[] args) throws NoSuchMethodException {
171                Method m = null;
172
173                // Obtain the class reference
174                Class<?> clazz = this.getClass();
175                Class<?>[] argArray = null;
176
177                if (args != null) {
178                        argArray = new Class<?>[args.length];
179                        for (int i = 0; i < args.length; i++) {
180                                argArray[i] = args[i].getClass();
181                        }
182                }
183
184                try {
185                        m = clazz.getMethod(methodName, argArray);
186                } catch (NoSuchMethodException nsm) {
187                        m = loadMethodWithPrimitives(methodName, argArray);
188                }
189                return m;
190        }
191
192        private Method loadMethodWithPrimitives(String methodName, Class<?>[] argArray) throws NoSuchMethodException {
193                if (argArray != null) {
194                        Method[] methods = this.getClass().getMethods();
195                        int length = argArray.length;
196
197                        for (Method method : methods) {
198                                String name = method.getName();
199                                int argsLength = method.getParameterTypes().length;
200
201                                if (name.equals(methodName) && length == argsLength) {
202                                        // This array can have primitive types inside
203                                        Class<?>[] params = method.getParameterTypes();
204
205                                        boolean found = true;
206
207                                        for (int i = 0; i < length; i++) {
208                                                if (params[i].isPrimitive()) {
209                                                        Class<?> paramWrapper = primitiveClasses.get(params[i].getName());
210
211                                                        if (!paramWrapper.equals(argArray[i])) {
212                                                                found = false;
213                                                                break;
214                                                        }
215                                                }
216                                        }
217                                        if (found) {
218                                                return method;
219                                        }
220                                }
221                        }
222                }
223                throw new NoSuchMethodException(methodName);
224        }
225
226        public List<Class<?>> getParams(String methodName) {
227                return params.get(methodName);
228        }
229
230        public Channel getChannel() {
231                return channel;
232        }
233
234        private void startQueues() throws Exception {
235                // Get info about which exchange and queue will use
236                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
237                String queue = UID;
238                String routingKey = UID;
239                // Multi info
240                String multiExchange = multi + exchange;
241
242                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
243
244                // Start channel
245                channel = broker.getNewChannel();
246
247                // Declares and bindings
248                logger.info("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue);
249                channel.exchangeDeclare(exchange, "direct");
250                channel.queueDeclare(queue, durable, false, false, null);
251                channel.queueBind(queue, exchange, routingKey);
252
253                channel.exchangeDeclare(multiExchange, "fanout");
254                channel.queueDeclare(multiQueue, durable, false, false, null);
255                channel.queueBind(multiQueue, multiExchange, "");
256
257                // Declare the event topic fanout
258                logger.info("RemoteObject: " + UID + " declaring fanout exchange: " + UID);
259                channel.exchangeDeclare(UID, "fanout");
260
261                // Declare a new consumer
262                consumer = new QueueingConsumer(channel);
263                channel.basicConsume(queue, true, consumer);
264                channel.basicConsume(multiQueue, true, consumer);
265        }
266
267        @Override
268        public void addListener(EventListener<?> eventListener) throws Exception {
269        }
270
271        @Override
272        public void removeListener(EventListener<?> eventListener) throws Exception {
273        }
274
275        @Override
276        public Collection<EventListener<?>> getListeners() throws Exception {
277                return null;
278        }
279
280}
Note: See TracBrowser for help on using the repository browser.