source: trunk/src/main/java/omq/client/proxy/Proxymq.java @ 83

Last change on this file since 83 was 83, checked in by stoda, 11 years ago

J

File size: 10.8 KB
RevLine 
[44]1package omq.client.proxy;
2
[58]3import java.lang.reflect.Array;
[44]4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
6import java.util.HashMap;
7import java.util.Map;
8import java.util.Properties;
9
10import omq.Remote;
11import omq.client.annotation.AsyncMethod;
[54]12import omq.client.annotation.MultiMethod;
[44]13import omq.client.annotation.SyncMethod;
14import omq.client.listener.ResponseListener;
15import omq.common.broker.Broker;
16import omq.common.message.Request;
17import omq.common.message.Response;
18import omq.common.util.ParameterQueue;
19import omq.common.util.Serializer;
20import omq.exception.OmqException;
21import omq.exception.RetryException;
22import omq.exception.TimeoutException;
23
[58]24import org.apache.log4j.Logger;
25
[44]26import com.rabbitmq.client.AMQP.BasicProperties;
27
28/**
[82]29 * Proxymq class. This class inherits from InvocationHandler, for this reason
30 * each proxymq instance has an associated invocation handler. When a method is
31 * invoked on a proxymq instance, the method invocation is encoded and
32 * dispatched to the invoke method of its invocation handler.
[44]33 *
34 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
35 *
36 */
37public class Proxymq implements InvocationHandler, Remote {
38
39        /**
40         *
41         */
42        private static final long serialVersionUID = 1L;
[49]43        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
[55]44        private static final String multi = "multi#";
[44]45
46        private String uid;
[70]47        private transient String exchange;
48        private transient String multiExchange;
49        private transient String replyQueueName;
[47]50        private transient String serializerType;
[53]51        private transient Broker broker;
[44]52        private transient ResponseListener rListener;
[53]53        private transient Serializer serializer;
[44]54        private transient Properties env;
[77]55        private transient Integer deliveryMode = null;
[44]56        private transient Map<String, byte[]> results;
57
58        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
59        static {
60                primitiveClasses.put("byte", Byte.class);
61                primitiveClasses.put("short", Short.class);
62                primitiveClasses.put("char", Character.class);
63                primitiveClasses.put("int", Integer.class);
64                primitiveClasses.put("long", Long.class);
65                primitiveClasses.put("float", Float.class);
66                primitiveClasses.put("double", Double.class);
67        }
68
69        /**
[82]70         * Proxymq Constructor.
[44]71         *
72         * This constructor uses an uid to know which object will call. It also uses
73         * Properties to set where to send the messages
74         *
75         * @param uid
76         *            The uid represents the unique identifier of a remote object
77         * @param clazz
78         *            It represents the real class of the remote object. With this
79         *            class the system can know the remoteInterface used and it can
80         *            also see which annotations are used
81         * @param env
82         *            The environment is used to know where to send the messages
83         * @throws Exception
84         */
[53]85        public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception {
[44]86                this.uid = uid;
[53]87                this.broker = broker;
88                rListener = broker.getResponseListener();
89                serializer = broker.getSerializer();
[44]90
91                // TODO what is better to use a new channel or to use the same?
92                // this.channel = Broker.getChannel();
[53]93                env = broker.getEnvironment();
[70]94                exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
[75]95                multiExchange = multi + uid;
[70]96                replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
[44]97
[47]98                // set the serializer type
[77]99                serializerType = env.getProperty(ParameterQueue.PROXY_SERIALIZER, Serializer.JAVA);
100                if (env.getProperty(ParameterQueue.DELIVERY_MODE) != null) {
101                        deliveryMode = Integer.parseInt(env.getProperty(ParameterQueue.DELIVERY_MODE));
102                }
[47]103
[44]104                // Create a new hashmap and registry it in rListener
105                results = new HashMap<String, byte[]>();
106                rListener.registerProxy(this);
107        }
108
109        @Override
110        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
111                // Local methods only
112                String methodName = method.getName();
113
114                // The local methods will be invoked here
115                if (method.getDeclaringClass().equals(Remote.class)) {
116                        if (methodName.equals("getRef")) {
117                                return getRef();
118                        }
119                }
120
121                // Create the request
122                Request request = createRequest(method, arguments);
123
124                Object response = null;
125                // Publish the request
126                if (request.isAsync()) {
[70]127                        publishMessage(request, replyQueueName);
[44]128                } else {
129                        response = publishSyncRequest(request, method.getReturnType());
130                }
131
132                return response;
133        }
134
[82]135        /**
136         * This method publishes a request
137         *
138         * @param request
139         *            - this request contains which method and which params will be
140         *            invoked in the server side.
141         * @param replyQueueName
142         *            - this param indicates where the responseListener will be
143         *            listen to.
144         * @throws Exception
145         */
[44]146        private void publishMessage(Request request, String replyQueueName) throws Exception {
147                String corrId = request.getId();
148
149                // Get the environment properties
[55]150                String exchange;
151                String routingkey;
[44]152
[55]153                if (request.isMulti()) {
[70]154                        exchange = multiExchange;
[55]155                        routingkey = "";
156                } else {
[70]157                        exchange = this.exchange;
[55]158                        routingkey = uid;
159                }
160
[44]161                // Add the correlation ID and create a replyTo property
[77]162                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType)
163                                .deliveryMode(deliveryMode).build();
[44]164
165                // Publish the message
[53]166                byte[] bytesRequest = serializer.serialize(serializerType, request);
167                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
[77]168                logger.debug("Proxymq: " + uid + " invokes '" + request.getMethod() + "' , corrID: " + corrId + ", exchange: " + exchange + ", replyQueue: "
169                                + replyQueueName + ", serializerType: " + serializerType + ", multi call: " + request.isMulti() + ", async call: " + request.isAsync()
170                                + ", delivery mode: " + deliveryMode);
[44]171        }
172
[82]173        /**
174         * This method publishes a synchronous request
175         *
176         * @param request
177         *            - this request contains which method and which params will be
178         *            invoked in the server side.
179         * @param type
180         *            - indicates which return type we are waiting for
181         * @return serverResponse
182         * @throws Exception
183         */
[44]184        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
185                String corrId = request.getId();
186
187                int retries = request.getRetries();
188                long timeout = request.getTimeout();
189
190                // Publish the message
191                int i = 0;
192                while (i < retries) {
193                        try {
194                                publishMessage(request, replyQueueName);
[58]195                                if (request.isMulti()) {
[82]196                                        return getResults(corrId, request.getWait(), timeout, type);
[58]197                                } else {
198                                        return getResult(corrId, timeout, type);
199                                }
200
[44]201                        } catch (TimeoutException te) {
[49]202                                logger.error(te);
[44]203                        }
204                        i++;
205                }
206                throw new RetryException(retries, timeout);
207        }
208
[82]209        /**
210         * This method creates a request using the annotations of the Remote
211         * interface
212         *
213         * @param method
214         *            - method to invoke in the server side
215         * @param arguments
216         *            - arguments of the method
217         * @return new Request
218         */
[44]219        private Request createRequest(Method method, Object[] arguments) {
220                String corrId = java.util.UUID.randomUUID().toString();
221                String methodName = method.getName();
[54]222                boolean multi = false;
[58]223                int wait = 0;
[44]224
[54]225                if (method.getAnnotation(MultiMethod.class) != null) {
226                        multi = true;
[58]227                        wait = method.getAnnotation(MultiMethod.class).waitNum();
[54]228                }
229
[44]230                // Since we need to know whether the method is async and if it has to
231                // return using an annotation, we'll only check the AsyncMethod
232                // annotation
233                if (method.getAnnotation(AsyncMethod.class) == null) {
234                        int retries = 1;
235                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
236                        if (method.getAnnotation(SyncMethod.class) != null) {
237                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
238                                retries = sync.retry();
239                                timeout = sync.timeout();
240                        }
[58]241                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi, wait);
[44]242                } else {
[54]243                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
[44]244                }
245        }
246
247        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
248                Response resp = null;
249
250                // Wait for the results.
[58]251                long localTimeout = timeout;
[44]252                long start = System.currentTimeMillis();
253                synchronized (results) {
254                        // Due to we are using notifyAll(), we need to control the real time
255                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
[58]256                                results.wait(localTimeout);
[44]257                                localTimeout = System.currentTimeMillis() - start;
258                        }
259                        if ((timeout - localTimeout) <= 0) {
260                                throw new TimeoutException("Timeout exception time: " + timeout);
261                        }
[53]262                        resp = serializer.deserializeResponse(results.get(corrId), type);
[44]263
264                        // Remove and indicate the key exists (a hashmap can contain a null
265                        // object, using this we'll know whether a response has been
266                        // received before)
267                        results.put(corrId, null);
268                }
269
270                if (resp.getError() != null) {
271                        OmqException error = resp.getError();
272                        String name = error.getType();
273                        String message = error.getMessage();
274                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
275                }
276
277                return resp.getResult();
278        }
279
[83]280        /**
281         * This method returns an array with length @MultiMethod.waitNum() with all
282         * the responses received.
283         *
284         * @param corrId
285         *            - Correlation Id of the request
286         * @param wait
287         *            - Array length
288         * @param timeout
289         *            - Timeout read in @SyncMethod.timeout(). If the timeout is set
290         *            in 2 seconds, the system will wait 2 seconds for the arriving
291         *            of all the responses.
292         * @param type
293         *            - Must be an Array type
294         * @return resultArray
295         * @throws Exception
296         */
[57]297        private Object getResults(String corrId, int wait, long timeout, Class<?> type) throws Exception {
298                Response resp = null;
[83]299                // Get the component type of an array
[58]300                Class<?> actualType = type.getComponentType();
[57]301
[58]302                Object array = Array.newInstance(actualType, wait);
303
[57]304                int i = 0;
305                long localTimeout = timeout;
306                long start = System.currentTimeMillis();
307
308                while (i < wait) {
309                        synchronized (results) {
310                                // Due to we are using notifyAll(), we need to control the real
311                                // time
312                                while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
313                                        results.wait(localTimeout);
314                                        localTimeout = System.currentTimeMillis() - start;
315                                }
316                                if ((timeout - localTimeout) <= 0) {
317                                        throw new TimeoutException("Timeout exception time: " + timeout);
318                                }
319                                // Remove the corrId to receive new replies
[58]320                                resp = serializer.deserializeResponse(results.remove(corrId), actualType);
321                                Array.set(array, i, resp.getResult());
[57]322                        }
323                        i++;
324                }
325                synchronized (results) {
326                        results.put(corrId, null);
327                }
328
[58]329                return array;
[57]330        }
331
[44]332        /**
333         * Gets the Map used internally to retreive the response of the server
334         *
335         * @return a map with all the keys processed. Every key is a correlation id
336         *         of a method invoked remotely
337         */
338        public Map<String, byte[]> getResults() {
339                return results;
340        }
341
342        @Override
343        public String getRef() {
344                return uid;
345        }
346
347}
Note: See TracBrowser for help on using the repository browser.