source: trunk/src/main/java/omq/client/proxy/Proxymq.java @ 57

Last change on this file since 57 was 57, checked in by stoda, 11 years ago

New function created: getResults which returns a list

File size: 12.0 KB
Line 
1package omq.client.proxy;
2
3import java.io.IOException;
4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
6import java.lang.reflect.Proxy;
7import java.util.ArrayList;
8import java.util.Collection;
9import java.util.HashMap;
10import java.util.Hashtable;
11import java.util.List;
12import java.util.Map;
13import java.util.Properties;
14
15import org.apache.log4j.Logger;
16
17import omq.Remote;
18import omq.client.annotation.AsyncMethod;
19import omq.client.annotation.MultiMethod;
20import omq.client.annotation.SyncMethod;
21import omq.client.listener.ResponseListener;
22import omq.common.broker.Broker;
23import omq.common.event.Event;
24import omq.common.event.EventDispatcher;
25import omq.common.event.EventListener;
26import omq.common.message.Request;
27import omq.common.message.Response;
28import omq.common.util.ParameterQueue;
29import omq.common.util.Serializer;
30import omq.exception.NoContainsInstanceException;
31import omq.exception.OmqException;
32import omq.exception.RetryException;
33import omq.exception.SerializerException;
34import omq.exception.TimeoutException;
35
36import com.rabbitmq.client.AMQP.BasicProperties;
37
38/**
39 * EvoProxy class. This class inherits from InvocationHandler and gives you a
40 * proxy with a server using an environment
41 *
42 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
43 *
44 */
45public class Proxymq implements InvocationHandler, Remote {
46
47        /**
48         *
49         */
50        private static final long serialVersionUID = 1L;
51        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
52        private static final String multi = "multi#";
53        private static Map<String, Object> proxies = new Hashtable<String, Object>();
54
55        private String uid;
56        private transient String serializerType;
57        private transient Broker broker;
58        private transient ResponseListener rListener;
59        private transient EventDispatcher dispatcher;
60        private transient Serializer serializer;
61        private transient Properties env;
62        private transient Map<String, byte[]> results;
63        private transient Map<String, EventListener<?>> listeners;
64
65        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
66        static {
67                primitiveClasses.put("byte", Byte.class);
68                primitiveClasses.put("short", Short.class);
69                primitiveClasses.put("char", Character.class);
70                primitiveClasses.put("int", Integer.class);
71                primitiveClasses.put("long", Long.class);
72                primitiveClasses.put("float", Float.class);
73                primitiveClasses.put("double", Double.class);
74        }
75
76        /**
77         * EvoProxy Constructor.
78         *
79         * This constructor uses an uid to know which object will call. It also uses
80         * Properties to set where to send the messages
81         *
82         * @param uid
83         *            The uid represents the unique identifier of a remote object
84         * @param clazz
85         *            It represents the real class of the remote object. With this
86         *            class the system can know the remoteInterface used and it can
87         *            also see which annotations are used
88         * @param env
89         *            The environment is used to know where to send the messages
90         * @throws Exception
91         */
92        public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception {
93                this.uid = uid;
94                this.broker = broker;
95                rListener = broker.getResponseListener();
96                dispatcher = broker.getEventDispatcher();
97                serializer = broker.getSerializer();
98
99                // TODO what is better to use a new channel or to use the same?
100                // this.channel = Broker.getChannel();
101                env = broker.getEnvironment();
102
103                // set the serializer type
104                serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java);
105
106                listeners = new HashMap<String, EventListener<?>>();
107
108                // Create a new hashmap and registry it in rListener
109                results = new HashMap<String, byte[]>();
110                rListener.registerProxy(this);
111        }
112
113        @Override
114        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
115                // Local methods only
116                String methodName = method.getName();
117
118                // The local methods will be invoked here
119                if (method.getDeclaringClass().equals(Remote.class)) {
120                        if (methodName.equals("getRef")) {
121                                return getRef();
122                        } else if (methodName.equals("addListener")) {
123                                addListener((EventListener<?>) arguments[0]);
124                                return null;
125                        } else if (methodName.equals("removeListener")) {
126                                removeListener((EventListener<?>) arguments[0]);
127                                return null;
128                        } else if (methodName.equals("getListeners")) {
129                                return getListeners();
130                        }
131                }
132
133                // Create the request
134                Request request = createRequest(method, arguments);
135
136                Object response = null;
137                // Publish the request
138                if (request.isAsync()) {
139                        logger.debug("Publish async request -> " + request.getId());
140                        publishAsyncRequest(request);
141                } else {
142                        logger.debug("Publish sync request -> " + request.getId());
143                        response = publishSyncRequest(request, method.getReturnType());
144                }
145
146                return response;
147        }
148
149        private void publishMessage(Request request, String replyQueueName) throws Exception {
150                String corrId = request.getId();
151
152                // Get the environment properties
153                String exchange;
154                String routingkey;
155
156                if (request.isMulti()) {
157                        exchange = multi + env.getProperty(ParameterQueue.RPC_EXCHANGE);
158                        routingkey = "";
159                } else {
160                        exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
161                        routingkey = uid;
162                }
163
164                // Add the correlation ID and create a replyTo property
165                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType).build();
166
167                // Publish the message
168                byte[] bytesRequest = serializer.serialize(serializerType, request);
169                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
170        }
171
172        private void publishAsyncRequest(Request request) throws Exception {
173                // Get the environment properties
174                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
175                publishMessage(request, replyQueueName);
176        }
177
178        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
179                String corrId = request.getId();
180
181                int retries = request.getRetries();
182                long timeout = request.getTimeout();
183
184                // Get the environment properties
185                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
186
187                // Publish the message
188                int i = 0;
189                while (i < retries) {
190                        try {
191                                publishMessage(request, replyQueueName);
192                                return getResult(corrId, timeout, type);
193                        } catch (TimeoutException te) {
194                                logger.error(te);
195                        }
196                        i++;
197                }
198                throw new RetryException(retries, timeout);
199        }
200
201        private Request createRequest(Method method, Object[] arguments) {
202                String corrId = java.util.UUID.randomUUID().toString();
203                String methodName = method.getName();
204                boolean multi = false;
205
206                if (method.getAnnotation(MultiMethod.class) != null) {
207                        multi = true;
208                }
209
210                // Since we need to know whether the method is async and if it has to
211                // return using an annotation, we'll only check the AsyncMethod
212                // annotation
213                if (method.getAnnotation(AsyncMethod.class) == null) {
214                        int retries = 1;
215                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
216                        if (method.getAnnotation(SyncMethod.class) != null) {
217                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
218                                retries = sync.retry();
219                                timeout = sync.timeout();
220                        }
221                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi);
222                } else {
223                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
224                }
225        }
226
227        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
228                Response resp = null;
229
230                // Wait for the results.
231                long localTimeout = 0;
232                long start = System.currentTimeMillis();
233                synchronized (results) {
234                        // Due to we are using notifyAll(), we need to control the real time
235                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
236                                results.wait(timeout);
237                                localTimeout = System.currentTimeMillis() - start;
238                        }
239                        if ((timeout - localTimeout) <= 0) {
240                                throw new TimeoutException("Timeout exception time: " + timeout);
241                        }
242                        resp = serializer.deserializeResponse(results.get(corrId), type);
243
244                        // Remove and indicate the key exists (a hashmap can contain a null
245                        // object, using this we'll know whether a response has been
246                        // received before)
247                        results.put(corrId, null);
248                }
249
250                if (resp.getError() != null) {
251                        OmqException error = resp.getError();
252                        String name = error.getType();
253                        String message = error.getMessage();
254                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
255                }
256
257                return resp.getResult();
258        }
259
260        @SuppressWarnings("unused")
261        private Object getResults(String corrId, int wait, long timeout, Class<?> type) throws Exception {
262                Response resp = null;
263                List<Object> list = new ArrayList<Object>();
264
265                int i = 0;
266                long localTimeout = timeout;
267                long start = System.currentTimeMillis();
268
269                while (i < wait) {
270                        synchronized (results) {
271                                // Due to we are using notifyAll(), we need to control the real
272                                // time
273                                while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
274                                        results.wait(localTimeout);
275                                        localTimeout = System.currentTimeMillis() - start;
276                                }
277                                if ((timeout - localTimeout) <= 0) {
278                                        throw new TimeoutException("Timeout exception time: " + timeout);
279                                }
280                                // Remove the corrId to receive new replies
281                                resp = serializer.deserializeResponse(results.remove(corrId), type);
282                                list.add(resp.getResult());
283
284                        }
285                        i++;
286                }
287                synchronized (results) {
288                        results.put(corrId, null);
289                }
290
291                return list;
292        }
293
294        /**
295         *
296         * @param reference
297         *            RemoteObject reference
298         * @return true if the proxy has been created before or false in the other
299         *         case
300         */
301        public static boolean containsProxy(String reference) {
302                return proxies.containsKey(reference);
303        }
304
305        /**
306         *
307         * @param reference
308         *            RemoteObject reference
309         * @return a proxy instance
310         * @throws NoContainsInstanceException
311         */
312        public static Object getInstance(String reference) throws NoContainsInstanceException {
313                if (!containsProxy(reference)) {
314                        throw new NoContainsInstanceException(reference);
315                }
316                return proxies.get(reference);
317        }
318
319        /**
320         * Returns an instance of a proxy class for the specified interfaces that
321         * dispatches method invocations to the specified invocation handler. * @param
322         * loader
323         *
324         * @param loader
325         *            the class loader to define the proxy class
326         *
327         * @param interfaces
328         *            the list of interfaces for the proxy class to implement
329         * @param proxy
330         *            the invocation handler to dispatch method invocations to
331         * @return a proxy instance with the specified invocation handler of a proxy
332         *         class that is defined by the specified class loader and that
333         *         implements the specified interfaces
334         */
335        public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) {
336                if (proxies.containsKey(proxy.getRef())) {
337                        return proxies.get(proxy.getRef());
338                }
339                Object value = Proxy.newProxyInstance(loader, interfaces, proxy);
340                proxies.put(proxy.getRef(), value);
341                return value;
342        }
343
344        /**
345         * Gets the Map used internally to retreive the response of the server
346         *
347         * @return a map with all the keys processed. Every key is a correlation id
348         *         of a method invoked remotely
349         */
350        public Map<String, byte[]> getResults() {
351                return results;
352        }
353
354        public static void stopProxy() {
355                proxies = new HashMap<String, Object>();
356        }
357
358        public static Map<String, Object> getProxies() {
359                return proxies;
360        }
361
362        public static void setProxies(Map<String, Object> proxies) {
363                Proxymq.proxies = proxies;
364        }
365
366        @Override
367        public String getRef() {
368                return uid;
369        }
370
371        @Override
372        public void notifyEvent(Event event) throws IOException, SerializerException {
373        }
374
375        @Override
376        public void addListener(EventListener<?> eventListener) throws Exception {
377                if (eventListener.getTopic() == null) {
378                        eventListener.setTopic(uid);
379                }
380                listeners.put(eventListener.getTopic(), eventListener);
381                dispatcher.addListener(eventListener);
382        }
383
384        @Override
385        public void removeListener(EventListener<?> eventListener) throws Exception {
386                listeners.remove(eventListener.getTopic());
387                dispatcher.removeListener(eventListener);
388        }
389
390        @Override
391        public Collection<EventListener<?>> getListeners() throws Exception {
392                return listeners.values();
393        }
394
395}
Note: See TracBrowser for help on using the repository browser.