source: branches/supervisor/src/main/java/omq/client/proxy/Proxymq.java

Last change on this file was 111, checked in by stoda, 11 years ago

Broker:

tryconnection won't be used
DOMConfigurator won't be used


File size: 11.5 KB
RevLine 
[44]1package omq.client.proxy;
2
[58]3import java.lang.reflect.Array;
[44]4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
[104]6import java.util.ArrayList;
[44]7import java.util.HashMap;
[104]8import java.util.List;
[44]9import java.util.Map;
10import java.util.Properties;
11
12import omq.Remote;
13import omq.client.annotation.AsyncMethod;
[54]14import omq.client.annotation.MultiMethod;
[44]15import omq.client.annotation.SyncMethod;
16import omq.client.listener.ResponseListener;
17import omq.common.broker.Broker;
18import omq.common.message.Request;
19import omq.common.message.Response;
20import omq.common.util.ParameterQueue;
21import omq.common.util.Serializer;
22import omq.exception.OmqException;
23import omq.exception.RetryException;
24import omq.exception.TimeoutException;
25
[58]26import org.apache.log4j.Logger;
27
[44]28import com.rabbitmq.client.AMQP.BasicProperties;
29
30/**
[82]31 * Proxymq class. This class inherits from InvocationHandler, for this reason
32 * each proxymq instance has an associated invocation handler. When a method is
33 * invoked on a proxymq instance, the method invocation is encoded and
34 * dispatched to the invoke method of its invocation handler.
[44]35 *
36 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
37 *
38 */
39public class Proxymq implements InvocationHandler, Remote {
40
41        /**
42         *
43         */
44        private static final long serialVersionUID = 1L;
[49]45        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
[55]46        private static final String multi = "multi#";
[44]47
[105]48        private String reference;
[107]49        private String UID;
[70]50        private transient String exchange;
51        private transient String multiExchange;
52        private transient String replyQueueName;
[47]53        private transient String serializerType;
[53]54        private transient Broker broker;
[44]55        private transient ResponseListener rListener;
[53]56        private transient Serializer serializer;
[44]57        private transient Properties env;
[77]58        private transient Integer deliveryMode = null;
[44]59        private transient Map<String, byte[]> results;
60
61        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
62        static {
63                primitiveClasses.put("byte", Byte.class);
64                primitiveClasses.put("short", Short.class);
65                primitiveClasses.put("char", Character.class);
66                primitiveClasses.put("int", Integer.class);
67                primitiveClasses.put("long", Long.class);
68                primitiveClasses.put("float", Float.class);
69                primitiveClasses.put("double", Double.class);
70        }
71
72        /**
[82]73         * Proxymq Constructor.
[44]74         *
[107]75         * This constructor uses an reference to know which object will call. It
76         * also uses Properties to set where to send the messages
[44]77         *
[105]78         * @param reference
[107]79         *            The reference represents the unique identifier of a remote
80         *            object
[44]81         * @param clazz
82         *            It represents the real class of the remote object. With this
83         *            class the system can know the remoteInterface used and it can
84         *            also see which annotations are used
85         * @param env
86         *            The environment is used to know where to send the messages
87         * @throws Exception
88         */
[105]89        public Proxymq(String reference, Class<?> clazz, Broker broker) throws Exception {
90                this.reference = reference;
[53]91                this.broker = broker;
92                rListener = broker.getResponseListener();
93                serializer = broker.getSerializer();
[44]94
95                // TODO what is better to use a new channel or to use the same?
96                // this.channel = Broker.getChannel();
[53]97                env = broker.getEnvironment();
[84]98                exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
[105]99                multiExchange = multi + reference;
[70]100                replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
[44]101
[47]102                // set the serializer type
[77]103                serializerType = env.getProperty(ParameterQueue.PROXY_SERIALIZER, Serializer.JAVA);
104                if (env.getProperty(ParameterQueue.DELIVERY_MODE) != null) {
105                        deliveryMode = Integer.parseInt(env.getProperty(ParameterQueue.DELIVERY_MODE));
106                }
[47]107
[44]108                // Create a new hashmap and registry it in rListener
109                results = new HashMap<String, byte[]>();
110                rListener.registerProxy(this);
111        }
112
113        @Override
114        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
115                // Local methods only
116                String methodName = method.getName();
117
118                // The local methods will be invoked here
119                if (method.getDeclaringClass().equals(Remote.class)) {
120                        if (methodName.equals("getRef")) {
121                                return getRef();
122                        }
[107]123                        if (methodName.equals("getUID")) {
124                                return getUID();
125                        }
126                        if (methodName.equals("setUID")) {
127                                setUID((String) arguments[0]);
128                                return null;
129                        }
[92]130                        if (methodName.equals("equals")) {
131                                if (arguments[0] instanceof Remote) {
132                                        return getRef().equals(((Remote) arguments[0]).getRef());
133                                } else {
134                                        return false;
135                                }
136                        }
[44]137                }
138
139                // Create the request
140                Request request = createRequest(method, arguments);
141
142                Object response = null;
143                // Publish the request
144                if (request.isAsync()) {
[70]145                        publishMessage(request, replyQueueName);
[44]146                } else {
147                        response = publishSyncRequest(request, method.getReturnType());
148                }
149
150                return response;
151        }
152
[82]153        /**
154         * This method publishes a request
155         *
156         * @param request
157         *            - this request contains which method and which params will be
158         *            invoked in the server side.
159         * @param replyQueueName
160         *            - this param indicates where the responseListener will be
161         *            listen to.
162         * @throws Exception
163         */
[44]164        private void publishMessage(Request request, String replyQueueName) throws Exception {
165                String corrId = request.getId();
166
167                // Get the environment properties
[55]168                String exchange;
169                String routingkey;
[44]170
[55]171                if (request.isMulti()) {
[70]172                        exchange = multiExchange;
[55]173                        routingkey = "";
174                } else {
[70]175                        exchange = this.exchange;
[105]176                        routingkey = reference;
[55]177                }
178
[107]179                // TODO look this carefully
180                String appId = UID == null ? reference : UID;
181
[44]182                // Add the correlation ID and create a replyTo property
[107]183                BasicProperties props = new BasicProperties.Builder().appId(appId).correlationId(corrId).replyTo(replyQueueName)
184                                .type(serializerType).deliveryMode(deliveryMode).build();
[44]185
186                // Publish the message
[53]187                byte[] bytesRequest = serializer.serialize(serializerType, request);
[99]188                broker.publishMessge(exchange, routingkey, props, bytesRequest);
[107]189                logger.debug("Proxymq: " + reference + " invokes '" + request.getMethod() + "' , corrID: " + corrId + ", exchange: " + exchange
190                                + ", replyQueue: " + replyQueueName + ", serializerType: " + serializerType + ", multi call: " + request.isMulti()
191                                + ", async call: " + request.isAsync() + ", delivery mode: " + deliveryMode);
[44]192        }
193
[82]194        /**
195         * This method publishes a synchronous request
196         *
197         * @param request
198         *            - this request contains which method and which params will be
199         *            invoked in the server side.
200         * @param type
201         *            - indicates which return type we are waiting for
202         * @return serverResponse
203         * @throws Exception
204         */
[44]205        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
206                String corrId = request.getId();
207
208                int retries = request.getRetries();
209                long timeout = request.getTimeout();
210
211                // Publish the message
212                int i = 0;
213                while (i < retries) {
214                        try {
215                                publishMessage(request, replyQueueName);
[58]216                                if (request.isMulti()) {
[104]217                                        return getResults(corrId, timeout, type);
[58]218                                } else {
219                                        return getResult(corrId, timeout, type);
220                                }
221
[44]222                        } catch (TimeoutException te) {
[49]223                                logger.error(te);
[44]224                        }
225                        i++;
226                }
227                throw new RetryException(retries, timeout);
228        }
229
[82]230        /**
231         * This method creates a request using the annotations of the Remote
232         * interface
233         *
234         * @param method
235         *            - method to invoke in the server side
236         * @param arguments
237         *            - arguments of the method
238         * @return new Request
239         */
[44]240        private Request createRequest(Method method, Object[] arguments) {
241                String corrId = java.util.UUID.randomUUID().toString();
242                String methodName = method.getName();
[54]243                boolean multi = false;
[44]244
[54]245                if (method.getAnnotation(MultiMethod.class) != null) {
246                        multi = true;
247                }
248
[44]249                // Since we need to know whether the method is async and if it has to
250                // return using an annotation, we'll only check the AsyncMethod
251                // annotation
252                if (method.getAnnotation(AsyncMethod.class) == null) {
253                        int retries = 1;
254                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
255                        if (method.getAnnotation(SyncMethod.class) != null) {
256                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
257                                retries = sync.retry();
258                                timeout = sync.timeout();
259                        }
[104]260                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi);
[44]261                } else {
[54]262                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
[44]263                }
264        }
265
266        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
267                Response resp = null;
268
269                // Wait for the results.
[111]270                long localTimeout = 0;
[44]271                long start = System.currentTimeMillis();
272                synchronized (results) {
273                        // Due to we are using notifyAll(), we need to control the real time
274                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
[58]275                                results.wait(localTimeout);
[44]276                                localTimeout = System.currentTimeMillis() - start;
277                        }
278                        if ((timeout - localTimeout) <= 0) {
279                                throw new TimeoutException("Timeout exception time: " + timeout);
280                        }
[53]281                        resp = serializer.deserializeResponse(results.get(corrId), type);
[44]282
283                        // Remove and indicate the key exists (a hashmap can contain a null
284                        // object, using this we'll know whether a response has been
285                        // received before)
286                        results.put(corrId, null);
287                }
288
289                if (resp.getError() != null) {
290                        OmqException error = resp.getError();
291                        String name = error.getType();
292                        String message = error.getMessage();
293                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
294                }
295
296                return resp.getResult();
297        }
298
[83]299        /**
300         * This method returns an array with length @MultiMethod.waitNum() with all
301         * the responses received.
302         *
303         * @param corrId
304         *            - Correlation Id of the request
305         * @param timeout
306         *            - Timeout read in @SyncMethod.timeout(). If the timeout is set
307         *            in 2 seconds, the system will wait 2 seconds for the arriving
308         *            of all the responses.
309         * @param type
310         *            - Must be an Array type
311         * @return resultArray
312         * @throws Exception
313         */
[104]314        private Object getResults(String corrId, long timeout, Class<?> type) throws Exception {
[57]315                Response resp = null;
[83]316                // Get the component type of an array
[58]317                Class<?> actualType = type.getComponentType();
[57]318
[104]319                List<Object> list = new ArrayList<Object>();
[58]320
[57]321                int i = 0;
[111]322                long localTimeout = 0;
[57]323                long start = System.currentTimeMillis();
324
[104]325                while (true) {
[57]326                        synchronized (results) {
327                                // Due to we are using notifyAll(), we need to control the real
328                                // time
329                                while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
330                                        results.wait(localTimeout);
331                                        localTimeout = System.currentTimeMillis() - start;
332                                }
333                                if ((timeout - localTimeout) <= 0) {
[104]334                                        break;
[57]335                                }
336                                // Remove the corrId to receive new replies
[58]337                                resp = serializer.deserializeResponse(results.remove(corrId), actualType);
[104]338                                list.add(resp.getResult());
[57]339                        }
340                        i++;
341                }
[104]342
343                if (i == 0) {
344                        results.remove(corrId);
345                        throw new TimeoutException("Timeout exception time: " + timeout);
346                }
347
[57]348                synchronized (results) {
349                        results.put(corrId, null);
350                }
351
[104]352                Object array = Array.newInstance(actualType, i);
353                i = 0;
354                for (Object o : list) {
355                        Array.set(array, i++, o);
356                }
357
[58]358                return array;
[57]359        }
360
[44]361        /**
362         * Gets the Map used internally to retreive the response of the server
363         *
364         * @return a map with all the keys processed. Every key is a correlation id
365         *         of a method invoked remotely
366         */
367        public Map<String, byte[]> getResults() {
368                return results;
369        }
370
371        @Override
372        public String getRef() {
[105]373                return reference;
[44]374        }
375
[107]376        @Override
377        public String getUID() {
378                return UID;
379        }
380
381        @Override
[111]382        public void setUID(String uID) {
[107]383                this.UID = uID;
384        }
385
[44]386}
Note: See TracBrowser for help on using the repository browser.