source: branches/supervisor/src/main/java/omq/client/proxy/Proxymq.java @ 106

Last change on this file since 106 was 105, checked in by stoda, 11 years ago

UID changed to reference
TODO: add UID

File size: 11.1 KB
Line 
1package omq.client.proxy;
2
3import java.lang.reflect.Array;
4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
6import java.util.ArrayList;
7import java.util.HashMap;
8import java.util.List;
9import java.util.Map;
10import java.util.Properties;
11
12import omq.Remote;
13import omq.client.annotation.AsyncMethod;
14import omq.client.annotation.MultiMethod;
15import omq.client.annotation.SyncMethod;
16import omq.client.listener.ResponseListener;
17import omq.common.broker.Broker;
18import omq.common.message.Request;
19import omq.common.message.Response;
20import omq.common.util.ParameterQueue;
21import omq.common.util.Serializer;
22import omq.exception.OmqException;
23import omq.exception.RetryException;
24import omq.exception.TimeoutException;
25
26import org.apache.log4j.Logger;
27
28import com.rabbitmq.client.AMQP.BasicProperties;
29
30/**
31 * Proxymq class. This class inherits from InvocationHandler, for this reason
32 * each proxymq instance has an associated invocation handler. When a method is
33 * invoked on a proxymq instance, the method invocation is encoded and
34 * dispatched to the invoke method of its invocation handler.
35 *
36 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
37 *
38 */
39public class Proxymq implements InvocationHandler, Remote {
40
41        /**
42         *
43         */
44        private static final long serialVersionUID = 1L;
45        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
46        private static final String multi = "multi#";
47
48        private String reference;
49        private transient String exchange;
50        private transient String multiExchange;
51        private transient String replyQueueName;
52        private transient String serializerType;
53        private transient Broker broker;
54        private transient ResponseListener rListener;
55        private transient Serializer serializer;
56        private transient Properties env;
57        private transient Integer deliveryMode = null;
58        private transient Map<String, byte[]> results;
59
60        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
61        static {
62                primitiveClasses.put("byte", Byte.class);
63                primitiveClasses.put("short", Short.class);
64                primitiveClasses.put("char", Character.class);
65                primitiveClasses.put("int", Integer.class);
66                primitiveClasses.put("long", Long.class);
67                primitiveClasses.put("float", Float.class);
68                primitiveClasses.put("double", Double.class);
69        }
70
71        /**
72         * Proxymq Constructor.
73         *
74         * This constructor uses an reference to know which object will call. It also uses
75         * Properties to set where to send the messages
76         *
77         * @param reference
78         *            The reference represents the unique identifier of a remote object
79         * @param clazz
80         *            It represents the real class of the remote object. With this
81         *            class the system can know the remoteInterface used and it can
82         *            also see which annotations are used
83         * @param env
84         *            The environment is used to know where to send the messages
85         * @throws Exception
86         */
87        public Proxymq(String reference, Class<?> clazz, Broker broker) throws Exception {
88                this.reference = reference;
89                this.broker = broker;
90                rListener = broker.getResponseListener();
91                serializer = broker.getSerializer();
92
93                // TODO what is better to use a new channel or to use the same?
94                // this.channel = Broker.getChannel();
95                env = broker.getEnvironment();
96                exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
97                multiExchange = multi + reference;
98                replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
99
100                // set the serializer type
101                serializerType = env.getProperty(ParameterQueue.PROXY_SERIALIZER, Serializer.JAVA);
102                if (env.getProperty(ParameterQueue.DELIVERY_MODE) != null) {
103                        deliveryMode = Integer.parseInt(env.getProperty(ParameterQueue.DELIVERY_MODE));
104                }
105
106                // Create a new hashmap and registry it in rListener
107                results = new HashMap<String, byte[]>();
108                rListener.registerProxy(this);
109        }
110
111        @Override
112        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
113                // Local methods only
114                String methodName = method.getName();
115
116                // The local methods will be invoked here
117                if (method.getDeclaringClass().equals(Remote.class)) {
118                        if (methodName.equals("getRef")) {
119                                return getRef();
120                        }
121                        if (methodName.equals("equals")) {
122                                if (arguments[0] instanceof Remote) {
123                                        return getRef().equals(((Remote) arguments[0]).getRef());
124                                } else {
125                                        return false;
126                                }
127                        }
128                }
129
130                // Create the request
131                Request request = createRequest(method, arguments);
132
133                Object response = null;
134                // Publish the request
135                if (request.isAsync()) {
136                        publishMessage(request, replyQueueName);
137                } else {
138                        response = publishSyncRequest(request, method.getReturnType());
139                }
140
141                return response;
142        }
143
144        /**
145         * This method publishes a request
146         *
147         * @param request
148         *            - this request contains which method and which params will be
149         *            invoked in the server side.
150         * @param replyQueueName
151         *            - this param indicates where the responseListener will be
152         *            listen to.
153         * @throws Exception
154         */
155        private void publishMessage(Request request, String replyQueueName) throws Exception {
156                String corrId = request.getId();
157
158                // Get the environment properties
159                String exchange;
160                String routingkey;
161
162                if (request.isMulti()) {
163                        exchange = multiExchange;
164                        routingkey = "";
165                } else {
166                        exchange = this.exchange;
167                        routingkey = reference;
168                }
169
170                // Add the correlation ID and create a replyTo property
171                BasicProperties props = new BasicProperties.Builder().appId(reference).correlationId(corrId).replyTo(replyQueueName).type(serializerType)
172                                .deliveryMode(deliveryMode).build();
173
174                // Publish the message
175                byte[] bytesRequest = serializer.serialize(serializerType, request);
176                broker.publishMessge(exchange, routingkey, props, bytesRequest);
177                logger.debug("Proxymq: " + reference + " invokes '" + request.getMethod() + "' , corrID: " + corrId + ", exchange: " + exchange + ", replyQueue: "
178                                + replyQueueName + ", serializerType: " + serializerType + ", multi call: " + request.isMulti() + ", async call: " + request.isAsync()
179                                + ", delivery mode: " + deliveryMode);
180        }
181
182        /**
183         * This method publishes a synchronous request
184         *
185         * @param request
186         *            - this request contains which method and which params will be
187         *            invoked in the server side.
188         * @param type
189         *            - indicates which return type we are waiting for
190         * @return serverResponse
191         * @throws Exception
192         */
193        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
194                String corrId = request.getId();
195
196                int retries = request.getRetries();
197                long timeout = request.getTimeout();
198
199                // Publish the message
200                int i = 0;
201                while (i < retries) {
202                        try {
203                                publishMessage(request, replyQueueName);
204                                if (request.isMulti()) {
205                                        return getResults(corrId, timeout, type);
206                                } else {
207                                        return getResult(corrId, timeout, type);
208                                }
209
210                        } catch (TimeoutException te) {
211                                logger.error(te);
212                        }
213                        i++;
214                }
215                throw new RetryException(retries, timeout);
216        }
217
218        /**
219         * This method creates a request using the annotations of the Remote
220         * interface
221         *
222         * @param method
223         *            - method to invoke in the server side
224         * @param arguments
225         *            - arguments of the method
226         * @return new Request
227         */
228        private Request createRequest(Method method, Object[] arguments) {
229                String corrId = java.util.UUID.randomUUID().toString();
230                String methodName = method.getName();
231                boolean multi = false;
232
233                if (method.getAnnotation(MultiMethod.class) != null) {
234                        multi = true;
235                }
236
237                // Since we need to know whether the method is async and if it has to
238                // return using an annotation, we'll only check the AsyncMethod
239                // annotation
240                if (method.getAnnotation(AsyncMethod.class) == null) {
241                        int retries = 1;
242                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
243                        if (method.getAnnotation(SyncMethod.class) != null) {
244                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
245                                retries = sync.retry();
246                                timeout = sync.timeout();
247                        }
248                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi);
249                } else {
250                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
251                }
252        }
253
254        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
255                Response resp = null;
256
257                // Wait for the results.
258                long localTimeout = timeout;
259                long start = System.currentTimeMillis();
260                synchronized (results) {
261                        // Due to we are using notifyAll(), we need to control the real time
262                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
263                                results.wait(localTimeout);
264                                localTimeout = System.currentTimeMillis() - start;
265                        }
266                        if ((timeout - localTimeout) <= 0) {
267                                throw new TimeoutException("Timeout exception time: " + timeout);
268                        }
269                        resp = serializer.deserializeResponse(results.get(corrId), type);
270
271                        // Remove and indicate the key exists (a hashmap can contain a null
272                        // object, using this we'll know whether a response has been
273                        // received before)
274                        results.put(corrId, null);
275                }
276
277                if (resp.getError() != null) {
278                        OmqException error = resp.getError();
279                        String name = error.getType();
280                        String message = error.getMessage();
281                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
282                }
283
284                return resp.getResult();
285        }
286
287        /**
288         * This method returns an array with length @MultiMethod.waitNum() with all
289         * the responses received.
290         *
291         * @param corrId
292         *            - Correlation Id of the request
293         * @param timeout
294         *            - Timeout read in @SyncMethod.timeout(). If the timeout is set
295         *            in 2 seconds, the system will wait 2 seconds for the arriving
296         *            of all the responses.
297         * @param type
298         *            - Must be an Array type
299         * @return resultArray
300         * @throws Exception
301         */
302        private Object getResults(String corrId, long timeout, Class<?> type) throws Exception {
303                Response resp = null;
304                // Get the component type of an array
305                Class<?> actualType = type.getComponentType();
306
307                List<Object> list = new ArrayList<Object>();
308
309                int i = 0;
310                long localTimeout = timeout;
311                long start = System.currentTimeMillis();
312
313                while (true) {
314                        synchronized (results) {
315                                // Due to we are using notifyAll(), we need to control the real
316                                // time
317                                while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
318                                        results.wait(localTimeout);
319                                        localTimeout = System.currentTimeMillis() - start;
320                                }
321                                if ((timeout - localTimeout) <= 0) {
322                                        break;
323                                }
324                                // Remove the corrId to receive new replies
325                                resp = serializer.deserializeResponse(results.remove(corrId), actualType);
326                                list.add(resp.getResult());
327                        }
328                        i++;
329                }
330
331                if (i == 0) {
332                        results.remove(corrId);
333                        throw new TimeoutException("Timeout exception time: " + timeout);
334                }
335
336                synchronized (results) {
337                        results.put(corrId, null);
338                }
339
340                Object array = Array.newInstance(actualType, i);
341                i = 0;
342                for (Object o : list) {
343                        Array.set(array, i++, o);
344                }
345
346                return array;
347        }
348
349        /**
350         * Gets the Map used internally to retreive the response of the server
351         *
352         * @return a map with all the keys processed. Every key is a correlation id
353         *         of a method invoked remotely
354         */
355        public Map<String, byte[]> getResults() {
356                return results;
357        }
358
359        @Override
360        public String getRef() {
361                return reference;
362        }
363
364}
Note: See TracBrowser for help on using the repository browser.