source: branches/supervisor/src/main/java/omq/client/proxy/Proxymq.java @ 92

Last change on this file since 92 was 92, checked in by stoda, 11 years ago

TODO: delete in supervisor
check the code

File size: 11.0 KB
Line 
1package omq.client.proxy;
2
3import java.lang.reflect.Array;
4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
6import java.util.HashMap;
7import java.util.Map;
8import java.util.Properties;
9
10import omq.Remote;
11import omq.client.annotation.AsyncMethod;
12import omq.client.annotation.MultiMethod;
13import omq.client.annotation.SyncMethod;
14import omq.client.listener.ResponseListener;
15import omq.common.broker.Broker;
16import omq.common.message.Request;
17import omq.common.message.Response;
18import omq.common.util.ParameterQueue;
19import omq.common.util.Serializer;
20import omq.exception.OmqException;
21import omq.exception.RetryException;
22import omq.exception.TimeoutException;
23
24import org.apache.log4j.Logger;
25
26import com.rabbitmq.client.AMQP.BasicProperties;
27
28/**
29 * Proxymq class. This class inherits from InvocationHandler, for this reason
30 * each proxymq instance has an associated invocation handler. When a method is
31 * invoked on a proxymq instance, the method invocation is encoded and
32 * dispatched to the invoke method of its invocation handler.
33 *
34 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
35 *
36 */
37public class Proxymq implements InvocationHandler, Remote {
38
39        /**
40         *
41         */
42        private static final long serialVersionUID = 1L;
43        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
44        private static final String multi = "multi#";
45
46        private String uid;
47        private transient String exchange;
48        private transient String multiExchange;
49        private transient String replyQueueName;
50        private transient String serializerType;
51        private transient Broker broker;
52        private transient ResponseListener rListener;
53        private transient Serializer serializer;
54        private transient Properties env;
55        private transient Integer deliveryMode = null;
56        private transient Map<String, byte[]> results;
57
58        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
59        static {
60                primitiveClasses.put("byte", Byte.class);
61                primitiveClasses.put("short", Short.class);
62                primitiveClasses.put("char", Character.class);
63                primitiveClasses.put("int", Integer.class);
64                primitiveClasses.put("long", Long.class);
65                primitiveClasses.put("float", Float.class);
66                primitiveClasses.put("double", Double.class);
67        }
68
69        /**
70         * Proxymq Constructor.
71         *
72         * This constructor uses an uid to know which object will call. It also uses
73         * Properties to set where to send the messages
74         *
75         * @param uid
76         *            The uid represents the unique identifier of a remote object
77         * @param clazz
78         *            It represents the real class of the remote object. With this
79         *            class the system can know the remoteInterface used and it can
80         *            also see which annotations are used
81         * @param env
82         *            The environment is used to know where to send the messages
83         * @throws Exception
84         */
85        public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception {
86                this.uid = uid;
87                this.broker = broker;
88                rListener = broker.getResponseListener();
89                serializer = broker.getSerializer();
90
91                // TODO what is better to use a new channel or to use the same?
92                // this.channel = Broker.getChannel();
93                env = broker.getEnvironment();
94                exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
95                multiExchange = multi + uid;
96                replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
97
98                // set the serializer type
99                serializerType = env.getProperty(ParameterQueue.PROXY_SERIALIZER, Serializer.JAVA);
100                if (env.getProperty(ParameterQueue.DELIVERY_MODE) != null) {
101                        deliveryMode = Integer.parseInt(env.getProperty(ParameterQueue.DELIVERY_MODE));
102                }
103
104                // Create a new hashmap and registry it in rListener
105                results = new HashMap<String, byte[]>();
106                rListener.registerProxy(this);
107        }
108
109        @Override
110        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
111                // Local methods only
112                String methodName = method.getName();
113
114                // The local methods will be invoked here
115                if (method.getDeclaringClass().equals(Remote.class)) {
116                        if (methodName.equals("getRef")) {
117                                return getRef();
118                        }
119                        if (methodName.equals("equals")) {
120                                if (arguments[0] instanceof Remote) {
121                                        return getRef().equals(((Remote) arguments[0]).getRef());
122                                } else {
123                                        return false;
124                                }
125                        }
126                }
127
128                // Create the request
129                Request request = createRequest(method, arguments);
130
131                Object response = null;
132                // Publish the request
133                if (request.isAsync()) {
134                        publishMessage(request, replyQueueName);
135                } else {
136                        response = publishSyncRequest(request, method.getReturnType());
137                }
138
139                return response;
140        }
141
142        /**
143         * This method publishes a request
144         *
145         * @param request
146         *            - this request contains which method and which params will be
147         *            invoked in the server side.
148         * @param replyQueueName
149         *            - this param indicates where the responseListener will be
150         *            listen to.
151         * @throws Exception
152         */
153        private void publishMessage(Request request, String replyQueueName) throws Exception {
154                String corrId = request.getId();
155
156                // Get the environment properties
157                String exchange;
158                String routingkey;
159
160                if (request.isMulti()) {
161                        exchange = multiExchange;
162                        routingkey = "";
163                } else {
164                        exchange = this.exchange;
165                        routingkey = uid;
166                }
167
168                // Add the correlation ID and create a replyTo property
169                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType)
170                                .deliveryMode(deliveryMode).build();
171
172                // Publish the message
173                byte[] bytesRequest = serializer.serialize(serializerType, request);
174                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
175                logger.debug("Proxymq: " + uid + " invokes '" + request.getMethod() + "' , corrID: " + corrId + ", exchange: " + exchange + ", replyQueue: "
176                                + replyQueueName + ", serializerType: " + serializerType + ", multi call: " + request.isMulti() + ", async call: " + request.isAsync()
177                                + ", delivery mode: " + deliveryMode);
178        }
179
180        /**
181         * This method publishes a synchronous request
182         *
183         * @param request
184         *            - this request contains which method and which params will be
185         *            invoked in the server side.
186         * @param type
187         *            - indicates which return type we are waiting for
188         * @return serverResponse
189         * @throws Exception
190         */
191        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
192                String corrId = request.getId();
193
194                int retries = request.getRetries();
195                long timeout = request.getTimeout();
196
197                // Publish the message
198                int i = 0;
199                while (i < retries) {
200                        try {
201                                publishMessage(request, replyQueueName);
202                                if (request.isMulti()) {
203                                        return getResults(corrId, request.getWait(), timeout, type);
204                                } else {
205                                        return getResult(corrId, timeout, type);
206                                }
207
208                        } catch (TimeoutException te) {
209                                logger.error(te);
210                        }
211                        i++;
212                }
213                throw new RetryException(retries, timeout);
214        }
215
216        /**
217         * This method creates a request using the annotations of the Remote
218         * interface
219         *
220         * @param method
221         *            - method to invoke in the server side
222         * @param arguments
223         *            - arguments of the method
224         * @return new Request
225         */
226        private Request createRequest(Method method, Object[] arguments) {
227                String corrId = java.util.UUID.randomUUID().toString();
228                String methodName = method.getName();
229                boolean multi = false;
230                int wait = 0;
231
232                if (method.getAnnotation(MultiMethod.class) != null) {
233                        multi = true;
234                        wait = method.getAnnotation(MultiMethod.class).waitNum();
235                }
236
237                // Since we need to know whether the method is async and if it has to
238                // return using an annotation, we'll only check the AsyncMethod
239                // annotation
240                if (method.getAnnotation(AsyncMethod.class) == null) {
241                        int retries = 1;
242                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
243                        if (method.getAnnotation(SyncMethod.class) != null) {
244                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
245                                retries = sync.retry();
246                                timeout = sync.timeout();
247                        }
248                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi, wait);
249                } else {
250                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
251                }
252        }
253
254        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
255                Response resp = null;
256
257                // Wait for the results.
258                long localTimeout = timeout;
259                long start = System.currentTimeMillis();
260                synchronized (results) {
261                        // Due to we are using notifyAll(), we need to control the real time
262                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
263                                results.wait(localTimeout);
264                                localTimeout = System.currentTimeMillis() - start;
265                        }
266                        if ((timeout - localTimeout) <= 0) {
267                                throw new TimeoutException("Timeout exception time: " + timeout);
268                        }
269                        resp = serializer.deserializeResponse(results.get(corrId), type);
270
271                        // Remove and indicate the key exists (a hashmap can contain a null
272                        // object, using this we'll know whether a response has been
273                        // received before)
274                        results.put(corrId, null);
275                }
276
277                if (resp.getError() != null) {
278                        OmqException error = resp.getError();
279                        String name = error.getType();
280                        String message = error.getMessage();
281                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
282                }
283
284                return resp.getResult();
285        }
286
287        /**
288         * This method returns an array with length @MultiMethod.waitNum() with all
289         * the responses received.
290         *
291         * @param corrId
292         *            - Correlation Id of the request
293         * @param wait
294         *            - Array length
295         * @param timeout
296         *            - Timeout read in @SyncMethod.timeout(). If the timeout is set
297         *            in 2 seconds, the system will wait 2 seconds for the arriving
298         *            of all the responses.
299         * @param type
300         *            - Must be an Array type
301         * @return resultArray
302         * @throws Exception
303         */
304        private Object getResults(String corrId, int wait, long timeout, Class<?> type) throws Exception {
305                Response resp = null;
306                // Get the component type of an array
307                Class<?> actualType = type.getComponentType();
308
309                Object array = Array.newInstance(actualType, wait);
310
311                int i = 0;
312                long localTimeout = timeout;
313                long start = System.currentTimeMillis();
314
315                while (i < wait) {
316                        synchronized (results) {
317                                // Due to we are using notifyAll(), we need to control the real
318                                // time
319                                while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
320                                        results.wait(localTimeout);
321                                        localTimeout = System.currentTimeMillis() - start;
322                                }
323                                if ((timeout - localTimeout) <= 0) {
324                                        throw new TimeoutException("Timeout exception time: " + timeout);
325                                }
326                                // Remove the corrId to receive new replies
327                                resp = serializer.deserializeResponse(results.remove(corrId), actualType);
328                                Array.set(array, i, resp.getResult());
329                        }
330                        i++;
331                }
332                synchronized (results) {
333                        results.put(corrId, null);
334                }
335
336                return array;
337        }
338
339        /**
340         * Gets the Map used internally to retreive the response of the server
341         *
342         * @return a map with all the keys processed. Every key is a correlation id
343         *         of a method invoked remotely
344         */
345        public Map<String, byte[]> getResults() {
346                return results;
347        }
348
349        @Override
350        public String getRef() {
351                return uid;
352        }
353
354}
Note: See TracBrowser for help on using the repository browser.