source: trunk/objectmq/src/omq/server/remote/request/RemoteObject.java @ 29

Last change on this file since 29 was 29, checked in by stoda, 12 years ago

Singleton didn't worked in parallel threads solved adding synchronized.
Num threads by property added.
Example with different threads added.

File size: 6.6 KB
Line 
1package omq.server.remote.request;
2
3import java.io.IOException;
4import java.lang.reflect.Method;
5import java.util.ArrayList;
6import java.util.Collection;
7import java.util.HashMap;
8import java.util.List;
9import java.util.Map;
10import java.util.Properties;
11
12import omq.Remote;
13import omq.common.broker.Broker;
14import omq.common.event.Event;
15import omq.common.event.EventListener;
16import omq.common.event.EventWrapper;
17import omq.common.util.ParameterQueue;
18import omq.common.util.Serializer;
19import omq.exception.SerializerException;
20
21import com.rabbitmq.client.Channel;
22import com.rabbitmq.client.ConsumerCancelledException;
23import com.rabbitmq.client.QueueingConsumer;
24import com.rabbitmq.client.QueueingConsumer.Delivery;
25import com.rabbitmq.client.ShutdownSignalException;
26
27/**
28 *
29 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
30 *
31 */
32public abstract class RemoteObject extends Thread implements Remote {
33
34        private static final long serialVersionUID = -1778953938739846450L;
35
36        private String UID;
37        private Properties env;
38        private transient RemoteWrapper remoteWrapper;
39        private transient Map<String, List<Class<?>>> params;
40        private transient Channel channel;
41        private transient QueueingConsumer consumer;
42        private transient boolean killed = false;
43
44        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
45
46        static {
47                primitiveClasses.put("byte", Byte.class);
48                primitiveClasses.put("short", Short.class);
49                primitiveClasses.put("char", Character.class);
50                primitiveClasses.put("int", Integer.class);
51                primitiveClasses.put("long", Long.class);
52                primitiveClasses.put("float", Float.class);
53                primitiveClasses.put("double", Double.class);
54        }
55
56        public RemoteObject() {
57        }
58
59        public void startRemoteObject(String reference, Properties env) throws Exception {
60                this.UID = reference;
61                this.env = env;
62
63                params = new HashMap<String, List<Class<?>>>();
64                for (Method m : this.getClass().getMethods()) {
65                        List<Class<?>> list = new ArrayList<Class<?>>();
66                        for (Class<?> clazz : m.getParameterTypes()) {
67                                list.add(clazz);
68                        }
69                        params.put(m.getName(), list);
70                }
71
72                // Get num threads to use
73                int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
74                remoteWrapper = new RemoteWrapper(this, numThreads);
75
76                startQueues();
77
78                // Start this listener
79                this.start();
80        }
81
82        @Override
83        public void run() {
84                while (!killed) {
85                        try {
86                                Delivery delivery = consumer.nextDelivery();
87                                System.out.println("RemoteObject: " + UID + " has received a message");
88                                remoteWrapper.notifyDelivery(delivery);
89                        } catch (InterruptedException i) {
90                                i.printStackTrace();
91                        } catch (ShutdownSignalException e) {
92                                e.printStackTrace();
93                                try {
94                                        if (channel.isOpen()) {
95                                                channel.close();
96                                        }
97                                        startQueues();
98                                } catch (Exception e1) {
99                                        try {
100                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
101                                                Thread.sleep(milis);
102                                        } catch (InterruptedException e2) {
103                                                // TODO Auto-generated catch block
104                                                e2.printStackTrace();
105                                        }
106                                        e1.printStackTrace();
107                                }
108                        } catch (ConsumerCancelledException e) {
109                                e.printStackTrace();
110                        } catch (SerializerException e) {
111                                e.printStackTrace();
112                        } catch (Exception e) {
113                                e.printStackTrace();
114                        }
115                }
116        }
117
118        @Override
119        public String getRef() {
120                return UID;
121        }
122
123        @Override
124        public void notifyEvent(Event event) throws IOException, SerializerException {
125                event.setTopic(UID);
126                EventWrapper wrapper = new EventWrapper(event);
127                channel.exchangeDeclare(UID, "fanout");
128                channel.basicPublish(UID, "", null, Serializer.serialize(wrapper));
129        }
130
131        public void kill() throws IOException {
132                interrupt();
133                killed = true;
134                channel.close();
135                remoteWrapper.stopRemoteWrapper();
136        }
137
138        public Object invokeMethod(String methodName, Object[] arguments) throws Exception {
139
140                // Get the specific method identified by methodName and its arguments
141                Method method = loadMethod(methodName, arguments);
142
143                return method.invoke(this, arguments);
144        }
145
146        private Method loadMethod(String methodName, Object[] args) throws NoSuchMethodException {
147                Method m = null;
148
149                // Obtain the class reference
150                Class<?> clazz = this.getClass();
151                Class<?>[] argArray = null;
152
153                if (args != null) {
154                        argArray = new Class<?>[args.length];
155                        for (int i = 0; i < args.length; i++) {
156                                argArray[i] = args[i].getClass();
157                        }
158                }
159
160                try {
161                        m = clazz.getMethod(methodName, argArray);
162                } catch (NoSuchMethodException nsm) {
163                        m = loadMethodWithPrimitives(methodName, argArray);
164                }
165                return m;
166        }
167
168        private Method loadMethodWithPrimitives(String methodName, Class<?>[] argArray) throws NoSuchMethodException {
169                Method[] methods = this.getClass().getMethods();
170                int length = argArray.length;
171
172                for (Method method : methods) {
173                        String name = method.getName();
174                        int argsLength = method.getParameterTypes().length;
175
176                        if (name.equals(methodName) && length == argsLength) {
177                                // This array can have primitive types inside
178                                Class<?>[] params = method.getParameterTypes();
179
180                                boolean found = true;
181
182                                for (int i = 0; i < length; i++) {
183                                        if (params[i].isPrimitive()) {
184                                                Class<?> paramWrapper = primitiveClasses.get(params[i].getName());
185
186                                                if (!paramWrapper.equals(argArray[i])) {
187                                                        found = false;
188                                                        break;
189                                                }
190                                        }
191                                }
192                                if (found) {
193                                        return method;
194                                }
195                        }
196                }
197                throw new NoSuchMethodException(methodName);
198        }
199
200        public List<Class<?>> getParams(String methodName) {
201                return params.get(methodName);
202        }
203
204        public Channel getChannel() {
205                return channel;
206        }
207
208        private void startQueues() throws Exception {
209                // Get info about which exchange and queue will use
210                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
211                String queue = UID;
212                String routingKey = UID;
213
214                // Start channel
215                channel = Broker.getNewChannel();
216
217                // Declares and bindings
218                System.out.println("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue);
219                channel.exchangeDeclare(exchange, "direct");
220                channel.queueDeclare(queue, false, false, false, null);
221                channel.queueBind(queue, exchange, routingKey);
222
223                // Declare the event topic fanout
224                System.out.println("RemoteObject: " + UID + " declaring fanout exchange: " + UID);
225                channel.exchangeDeclare(UID, "fanout");
226
227                // Declare a new consumer
228                consumer = new QueueingConsumer(channel);
229                channel.basicConsume(queue, true, consumer);
230        }
231
232        @Override
233        public void addListener(EventListener eventListener) throws Exception {
234        }
235
236        @Override
237        public void removeListener(EventListener eventListener) throws Exception {
238        }
239
240        @Override
241        public Collection<EventListener> getListeners() throws Exception {
242                return null;
243        }
244
245}
Note: See TracBrowser for help on using the repository browser.