source: trunk/src/main/java/omq/client/proxy/Proxymq.java

Last change on this file was 98, checked in by stoda, 11 years ago

0.5.6
Synchronized channel and reopening when they are closed

File size: 10.8 KB
Line 
1package omq.client.proxy;
2
3import java.lang.reflect.Array;
4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
6import java.util.HashMap;
7import java.util.Map;
8import java.util.Properties;
9
10import omq.Remote;
11import omq.client.annotation.AsyncMethod;
12import omq.client.annotation.MultiMethod;
13import omq.client.annotation.SyncMethod;
14import omq.client.listener.ResponseListener;
15import omq.common.broker.Broker;
16import omq.common.message.Request;
17import omq.common.message.Response;
18import omq.common.util.ParameterQueue;
19import omq.common.util.Serializer;
20import omq.exception.OmqException;
21import omq.exception.RetryException;
22import omq.exception.TimeoutException;
23
24import org.apache.log4j.Logger;
25
26import com.rabbitmq.client.AMQP.BasicProperties;
27
28/**
29 * Proxymq class. This class inherits from InvocationHandler, for this reason
30 * each proxymq instance has an associated invocation handler. When a method is
31 * invoked on a proxymq instance, the method invocation is encoded and
32 * dispatched to the invoke method of its invocation handler.
33 *
34 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
35 *
36 */
37public class Proxymq implements InvocationHandler, Remote {
38
39        /**
40         *
41         */
42        private static final long serialVersionUID = 1L;
43        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
44        private static final String multi = "multi#";
45
46        private String uid;
47        private transient String exchange;
48        private transient String multiExchange;
49        private transient String replyQueueName;
50        private transient String serializerType;
51        private transient Broker broker;
52        private transient ResponseListener rListener;
53        private transient Serializer serializer;
54        private transient Properties env;
55        private transient Integer deliveryMode = null;
56        private transient Map<String, byte[]> results;
57
58        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
59        static {
60                primitiveClasses.put("byte", Byte.class);
61                primitiveClasses.put("short", Short.class);
62                primitiveClasses.put("char", Character.class);
63                primitiveClasses.put("int", Integer.class);
64                primitiveClasses.put("long", Long.class);
65                primitiveClasses.put("float", Float.class);
66                primitiveClasses.put("double", Double.class);
67        }
68
69        /**
70         * Proxymq Constructor.
71         *
72         * This constructor uses an uid to know which object will call. It also uses
73         * Properties to set where to send the messages
74         *
75         * @param uid
76         *            The uid represents the unique identifier of a remote object
77         * @param clazz
78         *            It represents the real class of the remote object. With this
79         *            class the system can know the remoteInterface used and it can
80         *            also see which annotations are used
81         * @param env
82         *            The environment is used to know where to send the messages
83         * @throws Exception
84         */
85        public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception {
86                this.uid = uid;
87                this.broker = broker;
88                rListener = broker.getResponseListener();
89                serializer = broker.getSerializer();
90
91                // TODO what is better to use a new channel or to use the same?
92                // this.channel = Broker.getChannel();
93                env = broker.getEnvironment();
94                exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
95                multiExchange = multi + uid;
96                replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
97
98                // set the serializer type
99                serializerType = env.getProperty(ParameterQueue.PROXY_SERIALIZER, Serializer.JAVA);
100                if (env.getProperty(ParameterQueue.DELIVERY_MODE) != null) {
101                        deliveryMode = Integer.parseInt(env.getProperty(ParameterQueue.DELIVERY_MODE));
102                }
103
104                // Create a new hashmap and registry it in rListener
105                results = new HashMap<String, byte[]>();
106                rListener.registerProxy(this);
107        }
108
109        @Override
110        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
111                // Local methods only
112                String methodName = method.getName();
113
114                // The local methods will be invoked here
115                if (method.getDeclaringClass().equals(Remote.class)) {
116                        if (methodName.equals("getRef")) {
117                                return getRef();
118                        }
119                }
120
121                // Create the request
122                Request request = createRequest(method, arguments);
123
124                Object response = null;
125                // Publish the request
126                if (request.isAsync()) {
127                        publishMessage(request, replyQueueName);
128                } else {
129                        response = publishSyncRequest(request, method.getReturnType());
130                }
131
132                return response;
133        }
134
135        /**
136         * This method publishes a request
137         *
138         * @param request
139         *            - this request contains which method and which params will be
140         *            invoked in the server side.
141         * @param replyQueueName
142         *            - this param indicates where the responseListener will be
143         *            listen to.
144         * @throws Exception
145         */
146        private void publishMessage(Request request, String replyQueueName) throws Exception {
147                String corrId = request.getId();
148
149                // Get the environment properties
150                String exchange;
151                String routingkey;
152
153                if (request.isMulti()) {
154                        exchange = multiExchange;
155                        routingkey = "";
156                } else {
157                        exchange = this.exchange;
158                        routingkey = uid;
159                }
160
161                // Add the correlation ID and create a replyTo property
162                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType)
163                                .deliveryMode(deliveryMode).build();
164
165                // Publish the message
166                byte[] bytesRequest = serializer.serialize(serializerType, request);
167                broker.publishMessge(exchange, routingkey, props, bytesRequest);
168                logger.debug("Proxymq: " + uid + " invokes '" + request.getMethod() + "' , corrID: " + corrId + ", exchange: " + exchange + ", replyQueue: "
169                                + replyQueueName + ", serializerType: " + serializerType + ", multi call: " + request.isMulti() + ", async call: " + request.isAsync()
170                                + ", delivery mode: " + deliveryMode);
171        }
172
173        /**
174         * This method publishes a synchronous request
175         *
176         * @param request
177         *            - this request contains which method and which params will be
178         *            invoked in the server side.
179         * @param type
180         *            - indicates which return type we are waiting for
181         * @return serverResponse
182         * @throws Exception
183         */
184        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
185                String corrId = request.getId();
186
187                int retries = request.getRetries();
188                long timeout = request.getTimeout();
189
190                // Publish the message
191                int i = 0;
192                while (i < retries) {
193                        try {
194                                publishMessage(request, replyQueueName);
195                                if (request.isMulti()) {
196                                        return getResults(corrId, request.getWait(), timeout, type);
197                                } else {
198                                        return getResult(corrId, timeout, type);
199                                }
200
201                        } catch (TimeoutException te) {
202                                logger.error(te);
203                        }
204                        i++;
205                }
206                throw new RetryException(retries, timeout);
207        }
208
209        /**
210         * This method creates a request using the annotations of the Remote
211         * interface
212         *
213         * @param method
214         *            - method to invoke in the server side
215         * @param arguments
216         *            - arguments of the method
217         * @return new Request
218         */
219        private Request createRequest(Method method, Object[] arguments) {
220                String corrId = java.util.UUID.randomUUID().toString();
221                String methodName = method.getName();
222                boolean multi = false;
223                int wait = 0;
224
225                if (method.getAnnotation(MultiMethod.class) != null) {
226                        multi = true;
227                        wait = method.getAnnotation(MultiMethod.class).waitNum();
228                }
229
230                // Since we need to know whether the method is async and if it has to
231                // return using an annotation, we'll only check the AsyncMethod
232                // annotation
233                if (method.getAnnotation(AsyncMethod.class) == null) {
234                        int retries = 1;
235                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
236                        if (method.getAnnotation(SyncMethod.class) != null) {
237                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
238                                retries = sync.retry();
239                                timeout = sync.timeout();
240                        }
241                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi, wait);
242                } else {
243                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
244                }
245        }
246
247        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
248                Response resp = null;
249
250                // Wait for the results.
251                long localTimeout = timeout;
252                long start = System.currentTimeMillis();
253                synchronized (results) {
254                        // Due to we are using notifyAll(), we need to control the real time
255                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
256                                results.wait(localTimeout);
257                                localTimeout = System.currentTimeMillis() - start;
258                        }
259                        if ((timeout - localTimeout) <= 0) {
260                                throw new TimeoutException("Timeout exception time: " + timeout);
261                        }
262                        resp = serializer.deserializeResponse(results.get(corrId), type);
263
264                        // Remove and indicate the key exists (a hashmap can contain a null
265                        // object, using this we'll know whether a response has been
266                        // received before)
267                        results.put(corrId, null);
268                }
269
270                if (resp.getError() != null) {
271                        OmqException error = resp.getError();
272                        String name = error.getType();
273                        String message = error.getMessage();
274                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
275                }
276
277                return resp.getResult();
278        }
279
280        /**
281         * This method returns an array with length @MultiMethod.waitNum() with all
282         * the responses received.
283         *
284         * @param corrId
285         *            - Correlation Id of the request
286         * @param wait
287         *            - Array length
288         * @param timeout
289         *            - Timeout read in @SyncMethod.timeout(). If the timeout is set
290         *            in 2 seconds, the system will wait 2 seconds for the arriving
291         *            of all the responses.
292         * @param type
293         *            - Must be an Array type
294         * @return resultArray
295         * @throws Exception
296         */
297        private Object getResults(String corrId, int wait, long timeout, Class<?> type) throws Exception {
298                Response resp = null;
299                // Get the component type of an array
300                Class<?> actualType = type.getComponentType();
301
302                Object array = Array.newInstance(actualType, wait);
303
304                int i = 0;
305                long localTimeout = timeout;
306                long start = System.currentTimeMillis();
307
308                while (i < wait) {
309                        synchronized (results) {
310                                // Due to we are using notifyAll(), we need to control the real
311                                // time
312                                while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
313                                        results.wait(localTimeout);
314                                        localTimeout = System.currentTimeMillis() - start;
315                                }
316                                if ((timeout - localTimeout) <= 0) {
317                                        throw new TimeoutException("Timeout exception time: " + timeout);
318                                }
319                                // Remove the corrId to receive new replies
320                                resp = serializer.deserializeResponse(results.remove(corrId), actualType);
321                                Array.set(array, i, resp.getResult());
322                        }
323                        i++;
324                }
325                synchronized (results) {
326                        results.put(corrId, null);
327                }
328
329                return array;
330        }
331
332        /**
333         * Gets the Map used internally to retreive the response of the server
334         *
335         * @return a map with all the keys processed. Every key is a correlation id
336         *         of a method invoked remotely
337         */
338        public Map<String, byte[]> getResults() {
339                return results;
340        }
341
342        @Override
343        public String getRef() {
344                return uid;
345        }
346
347}
Note: See TracBrowser for help on using the repository browser.