source: branches/supervisor/src/main/java/omq/client/proxy/Proxymq.java @ 111

Last change on this file since 111 was 111, checked in by stoda, 11 years ago

Broker:

tryconnection won't be used
DOMConfigurator won't be used


File size: 11.5 KB
Line 
1package omq.client.proxy;
2
3import java.lang.reflect.Array;
4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
6import java.util.ArrayList;
7import java.util.HashMap;
8import java.util.List;
9import java.util.Map;
10import java.util.Properties;
11
12import omq.Remote;
13import omq.client.annotation.AsyncMethod;
14import omq.client.annotation.MultiMethod;
15import omq.client.annotation.SyncMethod;
16import omq.client.listener.ResponseListener;
17import omq.common.broker.Broker;
18import omq.common.message.Request;
19import omq.common.message.Response;
20import omq.common.util.ParameterQueue;
21import omq.common.util.Serializer;
22import omq.exception.OmqException;
23import omq.exception.RetryException;
24import omq.exception.TimeoutException;
25
26import org.apache.log4j.Logger;
27
28import com.rabbitmq.client.AMQP.BasicProperties;
29
30/**
31 * Proxymq class. This class inherits from InvocationHandler, for this reason
32 * each proxymq instance has an associated invocation handler. When a method is
33 * invoked on a proxymq instance, the method invocation is encoded and
34 * dispatched to the invoke method of its invocation handler.
35 *
36 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
37 *
38 */
39public class Proxymq implements InvocationHandler, Remote {
40
41        /**
42         *
43         */
44        private static final long serialVersionUID = 1L;
45        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
46        private static final String multi = "multi#";
47
48        private String reference;
49        private String UID;
50        private transient String exchange;
51        private transient String multiExchange;
52        private transient String replyQueueName;
53        private transient String serializerType;
54        private transient Broker broker;
55        private transient ResponseListener rListener;
56        private transient Serializer serializer;
57        private transient Properties env;
58        private transient Integer deliveryMode = null;
59        private transient Map<String, byte[]> results;
60
61        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
62        static {
63                primitiveClasses.put("byte", Byte.class);
64                primitiveClasses.put("short", Short.class);
65                primitiveClasses.put("char", Character.class);
66                primitiveClasses.put("int", Integer.class);
67                primitiveClasses.put("long", Long.class);
68                primitiveClasses.put("float", Float.class);
69                primitiveClasses.put("double", Double.class);
70        }
71
72        /**
73         * Proxymq Constructor.
74         *
75         * This constructor uses an reference to know which object will call. It
76         * also uses Properties to set where to send the messages
77         *
78         * @param reference
79         *            The reference represents the unique identifier of a remote
80         *            object
81         * @param clazz
82         *            It represents the real class of the remote object. With this
83         *            class the system can know the remoteInterface used and it can
84         *            also see which annotations are used
85         * @param env
86         *            The environment is used to know where to send the messages
87         * @throws Exception
88         */
89        public Proxymq(String reference, Class<?> clazz, Broker broker) throws Exception {
90                this.reference = reference;
91                this.broker = broker;
92                rListener = broker.getResponseListener();
93                serializer = broker.getSerializer();
94
95                // TODO what is better to use a new channel or to use the same?
96                // this.channel = Broker.getChannel();
97                env = broker.getEnvironment();
98                exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
99                multiExchange = multi + reference;
100                replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
101
102                // set the serializer type
103                serializerType = env.getProperty(ParameterQueue.PROXY_SERIALIZER, Serializer.JAVA);
104                if (env.getProperty(ParameterQueue.DELIVERY_MODE) != null) {
105                        deliveryMode = Integer.parseInt(env.getProperty(ParameterQueue.DELIVERY_MODE));
106                }
107
108                // Create a new hashmap and registry it in rListener
109                results = new HashMap<String, byte[]>();
110                rListener.registerProxy(this);
111        }
112
113        @Override
114        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
115                // Local methods only
116                String methodName = method.getName();
117
118                // The local methods will be invoked here
119                if (method.getDeclaringClass().equals(Remote.class)) {
120                        if (methodName.equals("getRef")) {
121                                return getRef();
122                        }
123                        if (methodName.equals("getUID")) {
124                                return getUID();
125                        }
126                        if (methodName.equals("setUID")) {
127                                setUID((String) arguments[0]);
128                                return null;
129                        }
130                        if (methodName.equals("equals")) {
131                                if (arguments[0] instanceof Remote) {
132                                        return getRef().equals(((Remote) arguments[0]).getRef());
133                                } else {
134                                        return false;
135                                }
136                        }
137                }
138
139                // Create the request
140                Request request = createRequest(method, arguments);
141
142                Object response = null;
143                // Publish the request
144                if (request.isAsync()) {
145                        publishMessage(request, replyQueueName);
146                } else {
147                        response = publishSyncRequest(request, method.getReturnType());
148                }
149
150                return response;
151        }
152
153        /**
154         * This method publishes a request
155         *
156         * @param request
157         *            - this request contains which method and which params will be
158         *            invoked in the server side.
159         * @param replyQueueName
160         *            - this param indicates where the responseListener will be
161         *            listen to.
162         * @throws Exception
163         */
164        private void publishMessage(Request request, String replyQueueName) throws Exception {
165                String corrId = request.getId();
166
167                // Get the environment properties
168                String exchange;
169                String routingkey;
170
171                if (request.isMulti()) {
172                        exchange = multiExchange;
173                        routingkey = "";
174                } else {
175                        exchange = this.exchange;
176                        routingkey = reference;
177                }
178
179                // TODO look this carefully
180                String appId = UID == null ? reference : UID;
181
182                // Add the correlation ID and create a replyTo property
183                BasicProperties props = new BasicProperties.Builder().appId(appId).correlationId(corrId).replyTo(replyQueueName)
184                                .type(serializerType).deliveryMode(deliveryMode).build();
185
186                // Publish the message
187                byte[] bytesRequest = serializer.serialize(serializerType, request);
188                broker.publishMessge(exchange, routingkey, props, bytesRequest);
189                logger.debug("Proxymq: " + reference + " invokes '" + request.getMethod() + "' , corrID: " + corrId + ", exchange: " + exchange
190                                + ", replyQueue: " + replyQueueName + ", serializerType: " + serializerType + ", multi call: " + request.isMulti()
191                                + ", async call: " + request.isAsync() + ", delivery mode: " + deliveryMode);
192        }
193
194        /**
195         * This method publishes a synchronous request
196         *
197         * @param request
198         *            - this request contains which method and which params will be
199         *            invoked in the server side.
200         * @param type
201         *            - indicates which return type we are waiting for
202         * @return serverResponse
203         * @throws Exception
204         */
205        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
206                String corrId = request.getId();
207
208                int retries = request.getRetries();
209                long timeout = request.getTimeout();
210
211                // Publish the message
212                int i = 0;
213                while (i < retries) {
214                        try {
215                                publishMessage(request, replyQueueName);
216                                if (request.isMulti()) {
217                                        return getResults(corrId, timeout, type);
218                                } else {
219                                        return getResult(corrId, timeout, type);
220                                }
221
222                        } catch (TimeoutException te) {
223                                logger.error(te);
224                        }
225                        i++;
226                }
227                throw new RetryException(retries, timeout);
228        }
229
230        /**
231         * This method creates a request using the annotations of the Remote
232         * interface
233         *
234         * @param method
235         *            - method to invoke in the server side
236         * @param arguments
237         *            - arguments of the method
238         * @return new Request
239         */
240        private Request createRequest(Method method, Object[] arguments) {
241                String corrId = java.util.UUID.randomUUID().toString();
242                String methodName = method.getName();
243                boolean multi = false;
244
245                if (method.getAnnotation(MultiMethod.class) != null) {
246                        multi = true;
247                }
248
249                // Since we need to know whether the method is async and if it has to
250                // return using an annotation, we'll only check the AsyncMethod
251                // annotation
252                if (method.getAnnotation(AsyncMethod.class) == null) {
253                        int retries = 1;
254                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
255                        if (method.getAnnotation(SyncMethod.class) != null) {
256                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
257                                retries = sync.retry();
258                                timeout = sync.timeout();
259                        }
260                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi);
261                } else {
262                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
263                }
264        }
265
266        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
267                Response resp = null;
268
269                // Wait for the results.
270                long localTimeout = 0;
271                long start = System.currentTimeMillis();
272                synchronized (results) {
273                        // Due to we are using notifyAll(), we need to control the real time
274                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
275                                results.wait(localTimeout);
276                                localTimeout = System.currentTimeMillis() - start;
277                        }
278                        if ((timeout - localTimeout) <= 0) {
279                                throw new TimeoutException("Timeout exception time: " + timeout);
280                        }
281                        resp = serializer.deserializeResponse(results.get(corrId), type);
282
283                        // Remove and indicate the key exists (a hashmap can contain a null
284                        // object, using this we'll know whether a response has been
285                        // received before)
286                        results.put(corrId, null);
287                }
288
289                if (resp.getError() != null) {
290                        OmqException error = resp.getError();
291                        String name = error.getType();
292                        String message = error.getMessage();
293                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
294                }
295
296                return resp.getResult();
297        }
298
299        /**
300         * This method returns an array with length @MultiMethod.waitNum() with all
301         * the responses received.
302         *
303         * @param corrId
304         *            - Correlation Id of the request
305         * @param timeout
306         *            - Timeout read in @SyncMethod.timeout(). If the timeout is set
307         *            in 2 seconds, the system will wait 2 seconds for the arriving
308         *            of all the responses.
309         * @param type
310         *            - Must be an Array type
311         * @return resultArray
312         * @throws Exception
313         */
314        private Object getResults(String corrId, long timeout, Class<?> type) throws Exception {
315                Response resp = null;
316                // Get the component type of an array
317                Class<?> actualType = type.getComponentType();
318
319                List<Object> list = new ArrayList<Object>();
320
321                int i = 0;
322                long localTimeout = 0;
323                long start = System.currentTimeMillis();
324
325                while (true) {
326                        synchronized (results) {
327                                // Due to we are using notifyAll(), we need to control the real
328                                // time
329                                while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
330                                        results.wait(localTimeout);
331                                        localTimeout = System.currentTimeMillis() - start;
332                                }
333                                if ((timeout - localTimeout) <= 0) {
334                                        break;
335                                }
336                                // Remove the corrId to receive new replies
337                                resp = serializer.deserializeResponse(results.remove(corrId), actualType);
338                                list.add(resp.getResult());
339                        }
340                        i++;
341                }
342
343                if (i == 0) {
344                        results.remove(corrId);
345                        throw new TimeoutException("Timeout exception time: " + timeout);
346                }
347
348                synchronized (results) {
349                        results.put(corrId, null);
350                }
351
352                Object array = Array.newInstance(actualType, i);
353                i = 0;
354                for (Object o : list) {
355                        Array.set(array, i++, o);
356                }
357
358                return array;
359        }
360
361        /**
362         * Gets the Map used internally to retreive the response of the server
363         *
364         * @return a map with all the keys processed. Every key is a correlation id
365         *         of a method invoked remotely
366         */
367        public Map<String, byte[]> getResults() {
368                return results;
369        }
370
371        @Override
372        public String getRef() {
373                return reference;
374        }
375
376        @Override
377        public String getUID() {
378                return UID;
379        }
380
381        @Override
382        public void setUID(String uID) {
383                this.UID = uID;
384        }
385
386}
Note: See TracBrowser for help on using the repository browser.