source: branches/supervisor/src/main/java/omq/client/proxy/Proxymq.java @ 90

Last change on this file since 90 was 84, checked in by stoda, 11 years ago

Default queues added, default exchange enabled, more control in remote queues added.
Tests verified and changed Persistent test to show how to make persistent messages.

File size: 10.8 KB
Line 
1package omq.client.proxy;
2
3import java.lang.reflect.Array;
4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
6import java.util.HashMap;
7import java.util.Map;
8import java.util.Properties;
9
10import omq.Remote;
11import omq.client.annotation.AsyncMethod;
12import omq.client.annotation.MultiMethod;
13import omq.client.annotation.SyncMethod;
14import omq.client.listener.ResponseListener;
15import omq.common.broker.Broker;
16import omq.common.message.Request;
17import omq.common.message.Response;
18import omq.common.util.ParameterQueue;
19import omq.common.util.Serializer;
20import omq.exception.OmqException;
21import omq.exception.RetryException;
22import omq.exception.TimeoutException;
23
24import org.apache.log4j.Logger;
25
26import com.rabbitmq.client.AMQP.BasicProperties;
27
28/**
29 * Proxymq class. This class inherits from InvocationHandler, for this reason
30 * each proxymq instance has an associated invocation handler. When a method is
31 * invoked on a proxymq instance, the method invocation is encoded and
32 * dispatched to the invoke method of its invocation handler.
33 *
34 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
35 *
36 */
37public class Proxymq implements InvocationHandler, Remote {
38
39        /**
40         *
41         */
42        private static final long serialVersionUID = 1L;
43        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
44        private static final String multi = "multi#";
45
46        private String uid;
47        private transient String exchange;
48        private transient String multiExchange;
49        private transient String replyQueueName;
50        private transient String serializerType;
51        private transient Broker broker;
52        private transient ResponseListener rListener;
53        private transient Serializer serializer;
54        private transient Properties env;
55        private transient Integer deliveryMode = null;
56        private transient Map<String, byte[]> results;
57
58        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
59        static {
60                primitiveClasses.put("byte", Byte.class);
61                primitiveClasses.put("short", Short.class);
62                primitiveClasses.put("char", Character.class);
63                primitiveClasses.put("int", Integer.class);
64                primitiveClasses.put("long", Long.class);
65                primitiveClasses.put("float", Float.class);
66                primitiveClasses.put("double", Double.class);
67        }
68
69        /**
70         * Proxymq Constructor.
71         *
72         * This constructor uses an uid to know which object will call. It also uses
73         * Properties to set where to send the messages
74         *
75         * @param uid
76         *            The uid represents the unique identifier of a remote object
77         * @param clazz
78         *            It represents the real class of the remote object. With this
79         *            class the system can know the remoteInterface used and it can
80         *            also see which annotations are used
81         * @param env
82         *            The environment is used to know where to send the messages
83         * @throws Exception
84         */
85        public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception {
86                this.uid = uid;
87                this.broker = broker;
88                rListener = broker.getResponseListener();
89                serializer = broker.getSerializer();
90
91                // TODO what is better to use a new channel or to use the same?
92                // this.channel = Broker.getChannel();
93                env = broker.getEnvironment();
94                exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
95                multiExchange = multi + uid;
96                replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
97
98                // set the serializer type
99                serializerType = env.getProperty(ParameterQueue.PROXY_SERIALIZER, Serializer.JAVA);
100                if (env.getProperty(ParameterQueue.DELIVERY_MODE) != null) {
101                        deliveryMode = Integer.parseInt(env.getProperty(ParameterQueue.DELIVERY_MODE));
102                }
103
104                // Create a new hashmap and registry it in rListener
105                results = new HashMap<String, byte[]>();
106                rListener.registerProxy(this);
107        }
108
109        @Override
110        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
111                // Local methods only
112                String methodName = method.getName();
113
114                // The local methods will be invoked here
115                if (method.getDeclaringClass().equals(Remote.class)) {
116                        if (methodName.equals("getRef")) {
117                                return getRef();
118                        }
119                }
120
121                // Create the request
122                Request request = createRequest(method, arguments);
123
124                Object response = null;
125                // Publish the request
126                if (request.isAsync()) {
127                        publishMessage(request, replyQueueName);
128                } else {
129                        response = publishSyncRequest(request, method.getReturnType());
130                }
131
132                return response;
133        }
134
135        /**
136         * This method publishes a request
137         *
138         * @param request
139         *            - this request contains which method and which params will be
140         *            invoked in the server side.
141         * @param replyQueueName
142         *            - this param indicates where the responseListener will be
143         *            listen to.
144         * @throws Exception
145         */
146        private void publishMessage(Request request, String replyQueueName) throws Exception {
147                String corrId = request.getId();
148
149                // Get the environment properties
150                String exchange;
151                String routingkey;
152
153                if (request.isMulti()) {
154                        exchange = multiExchange;
155                        routingkey = "";
156                } else {
157                        exchange = this.exchange;
158                        routingkey = uid;
159                }
160
161                // Add the correlation ID and create a replyTo property
162                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType)
163                                .deliveryMode(deliveryMode).build();
164
165                // Publish the message
166                byte[] bytesRequest = serializer.serialize(serializerType, request);
167                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
168                logger.debug("Proxymq: " + uid + " invokes '" + request.getMethod() + "' , corrID: " + corrId + ", exchange: " + exchange + ", replyQueue: "
169                                + replyQueueName + ", serializerType: " + serializerType + ", multi call: " + request.isMulti() + ", async call: " + request.isAsync()
170                                + ", delivery mode: " + deliveryMode);
171        }
172
173        /**
174         * This method publishes a synchronous request
175         *
176         * @param request
177         *            - this request contains which method and which params will be
178         *            invoked in the server side.
179         * @param type
180         *            - indicates which return type we are waiting for
181         * @return serverResponse
182         * @throws Exception
183         */
184        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
185                String corrId = request.getId();
186
187                int retries = request.getRetries();
188                long timeout = request.getTimeout();
189
190                // Publish the message
191                int i = 0;
192                while (i < retries) {
193                        try {
194                                publishMessage(request, replyQueueName);
195                                if (request.isMulti()) {
196                                        return getResults(corrId, request.getWait(), timeout, type);
197                                } else {
198                                        return getResult(corrId, timeout, type);
199                                }
200
201                        } catch (TimeoutException te) {
202                                logger.error(te);
203                        }
204                        i++;
205                }
206                throw new RetryException(retries, timeout);
207        }
208
209        /**
210         * This method creates a request using the annotations of the Remote
211         * interface
212         *
213         * @param method
214         *            - method to invoke in the server side
215         * @param arguments
216         *            - arguments of the method
217         * @return new Request
218         */
219        private Request createRequest(Method method, Object[] arguments) {
220                String corrId = java.util.UUID.randomUUID().toString();
221                String methodName = method.getName();
222                boolean multi = false;
223                int wait = 0;
224
225                if (method.getAnnotation(MultiMethod.class) != null) {
226                        multi = true;
227                        wait = method.getAnnotation(MultiMethod.class).waitNum();
228                }
229
230                // Since we need to know whether the method is async and if it has to
231                // return using an annotation, we'll only check the AsyncMethod
232                // annotation
233                if (method.getAnnotation(AsyncMethod.class) == null) {
234                        int retries = 1;
235                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
236                        if (method.getAnnotation(SyncMethod.class) != null) {
237                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
238                                retries = sync.retry();
239                                timeout = sync.timeout();
240                        }
241                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi, wait);
242                } else {
243                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
244                }
245        }
246
247        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
248                Response resp = null;
249
250                // Wait for the results.
251                long localTimeout = timeout;
252                long start = System.currentTimeMillis();
253                synchronized (results) {
254                        // Due to we are using notifyAll(), we need to control the real time
255                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
256                                results.wait(localTimeout);
257                                localTimeout = System.currentTimeMillis() - start;
258                        }
259                        if ((timeout - localTimeout) <= 0) {
260                                throw new TimeoutException("Timeout exception time: " + timeout);
261                        }
262                        resp = serializer.deserializeResponse(results.get(corrId), type);
263
264                        // Remove and indicate the key exists (a hashmap can contain a null
265                        // object, using this we'll know whether a response has been
266                        // received before)
267                        results.put(corrId, null);
268                }
269
270                if (resp.getError() != null) {
271                        OmqException error = resp.getError();
272                        String name = error.getType();
273                        String message = error.getMessage();
274                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
275                }
276
277                return resp.getResult();
278        }
279
280        /**
281         * This method returns an array with length @MultiMethod.waitNum() with all
282         * the responses received.
283         *
284         * @param corrId
285         *            - Correlation Id of the request
286         * @param wait
287         *            - Array length
288         * @param timeout
289         *            - Timeout read in @SyncMethod.timeout(). If the timeout is set
290         *            in 2 seconds, the system will wait 2 seconds for the arriving
291         *            of all the responses.
292         * @param type
293         *            - Must be an Array type
294         * @return resultArray
295         * @throws Exception
296         */
297        private Object getResults(String corrId, int wait, long timeout, Class<?> type) throws Exception {
298                Response resp = null;
299                // Get the component type of an array
300                Class<?> actualType = type.getComponentType();
301
302                Object array = Array.newInstance(actualType, wait);
303
304                int i = 0;
305                long localTimeout = timeout;
306                long start = System.currentTimeMillis();
307
308                while (i < wait) {
309                        synchronized (results) {
310                                // Due to we are using notifyAll(), we need to control the real
311                                // time
312                                while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
313                                        results.wait(localTimeout);
314                                        localTimeout = System.currentTimeMillis() - start;
315                                }
316                                if ((timeout - localTimeout) <= 0) {
317                                        throw new TimeoutException("Timeout exception time: " + timeout);
318                                }
319                                // Remove the corrId to receive new replies
320                                resp = serializer.deserializeResponse(results.remove(corrId), actualType);
321                                Array.set(array, i, resp.getResult());
322                        }
323                        i++;
324                }
325                synchronized (results) {
326                        results.put(corrId, null);
327                }
328
329                return array;
330        }
331
332        /**
333         * Gets the Map used internally to retreive the response of the server
334         *
335         * @return a map with all the keys processed. Every key is a correlation id
336         *         of a method invoked remotely
337         */
338        public Map<String, byte[]> getResults() {
339                return results;
340        }
341
342        @Override
343        public String getRef() {
344                return uid;
345        }
346
347}
Note: See TracBrowser for help on using the repository browser.