source: trunk/src/main/java/omq/client/proxy/Proxymq.java @ 63

Last change on this file since 63 was 62, checked in by gguerrero, 11 years ago
File size: 11.0 KB
Line 
1package omq.client.proxy;
2
3import java.io.IOException;
4import java.lang.reflect.Array;
5import java.lang.reflect.InvocationHandler;
6import java.lang.reflect.Method;
7import java.lang.reflect.Proxy;
8import java.util.Collection;
9import java.util.HashMap;
10import java.util.Map;
11import java.util.Properties;
12
13import omq.Remote;
14import omq.client.annotation.AsyncMethod;
15import omq.client.annotation.MultiMethod;
16import omq.client.annotation.SyncMethod;
17import omq.client.listener.ResponseListener;
18import omq.common.broker.Broker;
19import omq.common.event.Event;
20import omq.common.event.EventDispatcher;
21import omq.common.event.EventListener;
22import omq.common.message.Request;
23import omq.common.message.Response;
24import omq.common.util.ParameterQueue;
25import omq.common.util.Serializer;
26import omq.exception.OmqException;
27import omq.exception.RetryException;
28import omq.exception.SerializerException;
29import omq.exception.TimeoutException;
30
31import org.apache.log4j.Logger;
32
33import com.rabbitmq.client.AMQP.BasicProperties;
34
35/**
36 * EvoProxy class. This class inherits from InvocationHandler and gives you a
37 * proxy with a server using an environment
38 *
39 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
40 *
41 */
42public class Proxymq implements InvocationHandler, Remote {
43
44        /**
45         *
46         */
47        private static final long serialVersionUID = 1L;
48        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
49        private static final String multi = "multi#";
50
51        private String uid;
52        private transient String serializerType;
53        private transient Broker broker;
54        private transient ResponseListener rListener;
55        private transient EventDispatcher dispatcher;
56        private transient Serializer serializer;
57        private transient Properties env;
58        private transient Map<String, byte[]> results;
59        private transient Map<String, EventListener<?>> listeners;
60
61        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
62        static {
63                primitiveClasses.put("byte", Byte.class);
64                primitiveClasses.put("short", Short.class);
65                primitiveClasses.put("char", Character.class);
66                primitiveClasses.put("int", Integer.class);
67                primitiveClasses.put("long", Long.class);
68                primitiveClasses.put("float", Float.class);
69                primitiveClasses.put("double", Double.class);
70        }
71
72        /**
73         * EvoProxy Constructor.
74         *
75         * This constructor uses an uid to know which object will call. It also uses
76         * Properties to set where to send the messages
77         *
78         * @param uid
79         *            The uid represents the unique identifier of a remote object
80         * @param clazz
81         *            It represents the real class of the remote object. With this
82         *            class the system can know the remoteInterface used and it can
83         *            also see which annotations are used
84         * @param env
85         *            The environment is used to know where to send the messages
86         * @throws Exception
87         */
88        public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception {
89                this.uid = uid;
90                this.broker = broker;
91                rListener = broker.getResponseListener();
92                dispatcher = broker.getEventDispatcher();
93                serializer = broker.getSerializer();
94
95                // TODO what is better to use a new channel or to use the same?
96                // this.channel = Broker.getChannel();
97                env = broker.getEnvironment();
98
99                // set the serializer type
100                serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA);
101
102                listeners = new HashMap<String, EventListener<?>>();
103
104                // Create a new hashmap and registry it in rListener
105                results = new HashMap<String, byte[]>();
106                rListener.registerProxy(this);
107        }
108
109        @Override
110        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
111                // Local methods only
112                String methodName = method.getName();
113
114                // The local methods will be invoked here
115                if (method.getDeclaringClass().equals(Remote.class)) {
116                        if (methodName.equals("getRef")) {
117                                return getRef();
118                        } else if (methodName.equals("addListener")) {
119                                addListener((EventListener<?>) arguments[0]);
120                                return null;
121                        } else if (methodName.equals("removeListener")) {
122                                removeListener((EventListener<?>) arguments[0]);
123                                return null;
124                        } else if (methodName.equals("getListeners")) {
125                                return getListeners();
126                        }
127                }
128
129                // Create the request
130                Request request = createRequest(method, arguments);
131
132                Object response = null;
133                // Publish the request
134                if (request.isAsync()) {
135                        logger.debug("Publish async request -> " + request.getId());
136                        publishAsyncRequest(request);
137                } else {
138                        logger.debug("Publish sync request -> " + request.getId());
139                        response = publishSyncRequest(request, method.getReturnType());
140                }
141
142                return response;
143        }
144
145        private void publishMessage(Request request, String replyQueueName) throws Exception {
146                String corrId = request.getId();
147
148                // Get the environment properties
149                String exchange;
150                String routingkey;
151
152                if (request.isMulti()) {
153                        exchange = multi + env.getProperty(ParameterQueue.RPC_EXCHANGE);
154                        routingkey = "";
155                } else {
156                        exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
157                        routingkey = uid;
158                }
159
160                // Add the correlation ID and create a replyTo property
161                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType).build();
162
163                // Publish the message
164                byte[] bytesRequest = serializer.serialize(serializerType, request);
165                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
166        }
167
168        private void publishAsyncRequest(Request request) throws Exception {
169                // Get the environment properties
170                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
171                publishMessage(request, replyQueueName);
172        }
173
174        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
175                String corrId = request.getId();
176
177                int retries = request.getRetries();
178                long timeout = request.getTimeout();
179
180                // Get the environment properties
181                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
182
183                // Publish the message
184                int i = 0;
185                while (i < retries) {
186                        try {
187                                publishMessage(request, replyQueueName);
188                                if (request.isMulti()) {
189                                        return getResults(corrId, 2, timeout, type);
190                                } else {
191                                        return getResult(corrId, timeout, type);
192                                }
193
194                        } catch (TimeoutException te) {
195                                logger.error(te);
196                        }
197                        i++;
198                }
199                throw new RetryException(retries, timeout);
200        }
201
202        private Request createRequest(Method method, Object[] arguments) {
203                String corrId = java.util.UUID.randomUUID().toString();
204                String methodName = method.getName();
205                boolean multi = false;
206                int wait = 0;
207
208                if (method.getAnnotation(MultiMethod.class) != null) {
209                        multi = true;
210                        wait = method.getAnnotation(MultiMethod.class).waitNum();
211                }
212
213                // Since we need to know whether the method is async and if it has to
214                // return using an annotation, we'll only check the AsyncMethod
215                // annotation
216                if (method.getAnnotation(AsyncMethod.class) == null) {
217                        int retries = 1;
218                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
219                        if (method.getAnnotation(SyncMethod.class) != null) {
220                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
221                                retries = sync.retry();
222                                timeout = sync.timeout();
223                        }
224                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi, wait);
225                } else {
226                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
227                }
228        }
229
230        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
231                Response resp = null;
232
233                // Wait for the results.
234                long localTimeout = timeout;
235                long start = System.currentTimeMillis();
236                synchronized (results) {
237                        // Due to we are using notifyAll(), we need to control the real time
238                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
239                                results.wait(localTimeout);
240                                localTimeout = System.currentTimeMillis() - start;
241                        }
242                        if ((timeout - localTimeout) <= 0) {
243                                throw new TimeoutException("Timeout exception time: " + timeout);
244                        }
245                        resp = serializer.deserializeResponse(results.get(corrId), type);
246
247                        // Remove and indicate the key exists (a hashmap can contain a null
248                        // object, using this we'll know whether a response has been
249                        // received before)
250                        results.put(corrId, null);
251                }
252
253                if (resp.getError() != null) {
254                        OmqException error = resp.getError();
255                        String name = error.getType();
256                        String message = error.getMessage();
257                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
258                }
259
260                return resp.getResult();
261        }
262
263        private Object getResults(String corrId, int wait, long timeout, Class<?> type) throws Exception {
264                Response resp = null;
265                Class<?> actualType = type.getComponentType();
266
267                Object array = Array.newInstance(actualType, wait);
268
269                int i = 0;
270                long localTimeout = timeout;
271                long start = System.currentTimeMillis();
272
273                while (i < wait) {
274                        synchronized (results) {
275                                // Due to we are using notifyAll(), we need to control the real
276                                // time
277                                while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
278                                        results.wait(localTimeout);
279                                        localTimeout = System.currentTimeMillis() - start;
280                                }
281                                if ((timeout - localTimeout) <= 0) {
282                                        throw new TimeoutException("Timeout exception time: " + timeout);
283                                }
284                                // Remove the corrId to receive new replies
285                                resp = serializer.deserializeResponse(results.remove(corrId), actualType);
286                                Array.set(array, i, resp.getResult());
287                        }
288                        i++;
289                }
290                synchronized (results) {
291                        results.put(corrId, null);
292                }
293
294                return array;
295        }
296
297        /**
298         * Returns an instance of a proxy class for the specified interfaces that
299         * dispatches method invocations to the specified invocation handler. * @param
300         * loader
301         *
302         * @param loader
303         *            the class loader to define the proxy class
304         *
305         * @param interfaces
306         *            the list of interfaces for the proxy class to implement
307         * @param proxy
308         *            the invocation handler to dispatch method invocations to
309         * @return a proxy instance with the specified invocation handler of a proxy
310         *         class that is defined by the specified class loader and that
311         *         implements the specified interfaces
312         */
313        public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) {
314                return Proxy.newProxyInstance(loader, interfaces, proxy);
315        }
316
317        /**
318         * Gets the Map used internally to retreive the response of the server
319         *
320         * @return a map with all the keys processed. Every key is a correlation id
321         *         of a method invoked remotely
322         */
323        public Map<String, byte[]> getResults() {
324                return results;
325        }
326
327        @Override
328        public String getRef() {
329                return uid;
330        }
331
332        @Override
333        public void notifyEvent(Event event) throws IOException, SerializerException {
334        }
335
336        @Override
337        public void addListener(EventListener<?> eventListener) throws Exception {
338                if (eventListener.getTopic() == null) {
339                        eventListener.setTopic(uid);
340                }
341                listeners.put(eventListener.getTopic(), eventListener);
342                dispatcher.addListener(eventListener);
343        }
344
345        @Override
346        public void removeListener(EventListener<?> eventListener) throws Exception {
347                listeners.remove(eventListener.getTopic());
348                dispatcher.removeListener(eventListener);
349        }
350
351        @Override
352        public Collection<EventListener<?>> getListeners() throws Exception {
353                return listeners.values();
354        }
355
356}
Note: See TracBrowser for help on using the repository browser.