source: trunk/src/main/java/omq/client/proxy/Proxymq.java @ 58

Last change on this file since 58 was 58, checked in by stoda, 11 years ago

@MultiMethod? + @SyncMethod? implemented and tested

File size: 12.3 KB
Line 
1package omq.client.proxy;
2
3import java.io.IOException;
4import java.lang.reflect.Array;
5import java.lang.reflect.InvocationHandler;
6import java.lang.reflect.Method;
7import java.lang.reflect.Proxy;
8import java.util.Collection;
9import java.util.HashMap;
10import java.util.Hashtable;
11import java.util.Map;
12import java.util.Properties;
13
14import omq.Remote;
15import omq.client.annotation.AsyncMethod;
16import omq.client.annotation.MultiMethod;
17import omq.client.annotation.SyncMethod;
18import omq.client.listener.ResponseListener;
19import omq.common.broker.Broker;
20import omq.common.event.Event;
21import omq.common.event.EventDispatcher;
22import omq.common.event.EventListener;
23import omq.common.message.Request;
24import omq.common.message.Response;
25import omq.common.util.ParameterQueue;
26import omq.common.util.Serializer;
27import omq.exception.NoContainsInstanceException;
28import omq.exception.OmqException;
29import omq.exception.RetryException;
30import omq.exception.SerializerException;
31import omq.exception.TimeoutException;
32
33import org.apache.log4j.Logger;
34
35import com.rabbitmq.client.AMQP.BasicProperties;
36
37/**
38 * EvoProxy class. This class inherits from InvocationHandler and gives you a
39 * proxy with a server using an environment
40 *
41 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
42 *
43 */
44public class Proxymq implements InvocationHandler, Remote {
45
46        /**
47         *
48         */
49        private static final long serialVersionUID = 1L;
50        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
51        private static final String multi = "multi#";
52        private static Map<String, Object> proxies = new Hashtable<String, Object>();
53
54        private String uid;
55        private transient String serializerType;
56        private transient Broker broker;
57        private transient ResponseListener rListener;
58        private transient EventDispatcher dispatcher;
59        private transient Serializer serializer;
60        private transient Properties env;
61        private transient Map<String, byte[]> results;
62        private transient Map<String, EventListener<?>> listeners;
63
64        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
65        static {
66                primitiveClasses.put("byte", Byte.class);
67                primitiveClasses.put("short", Short.class);
68                primitiveClasses.put("char", Character.class);
69                primitiveClasses.put("int", Integer.class);
70                primitiveClasses.put("long", Long.class);
71                primitiveClasses.put("float", Float.class);
72                primitiveClasses.put("double", Double.class);
73        }
74
75        /**
76         * EvoProxy Constructor.
77         *
78         * This constructor uses an uid to know which object will call. It also uses
79         * Properties to set where to send the messages
80         *
81         * @param uid
82         *            The uid represents the unique identifier of a remote object
83         * @param clazz
84         *            It represents the real class of the remote object. With this
85         *            class the system can know the remoteInterface used and it can
86         *            also see which annotations are used
87         * @param env
88         *            The environment is used to know where to send the messages
89         * @throws Exception
90         */
91        public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception {
92                this.uid = uid;
93                this.broker = broker;
94                rListener = broker.getResponseListener();
95                dispatcher = broker.getEventDispatcher();
96                serializer = broker.getSerializer();
97
98                // TODO what is better to use a new channel or to use the same?
99                // this.channel = Broker.getChannel();
100                env = broker.getEnvironment();
101
102                // set the serializer type
103                serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java);
104
105                listeners = new HashMap<String, EventListener<?>>();
106
107                // Create a new hashmap and registry it in rListener
108                results = new HashMap<String, byte[]>();
109                rListener.registerProxy(this);
110        }
111
112        @Override
113        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
114                // Local methods only
115                String methodName = method.getName();
116
117                // The local methods will be invoked here
118                if (method.getDeclaringClass().equals(Remote.class)) {
119                        if (methodName.equals("getRef")) {
120                                return getRef();
121                        } else if (methodName.equals("addListener")) {
122                                addListener((EventListener<?>) arguments[0]);
123                                return null;
124                        } else if (methodName.equals("removeListener")) {
125                                removeListener((EventListener<?>) arguments[0]);
126                                return null;
127                        } else if (methodName.equals("getListeners")) {
128                                return getListeners();
129                        }
130                }
131
132                // Create the request
133                Request request = createRequest(method, arguments);
134
135                Object response = null;
136                // Publish the request
137                if (request.isAsync()) {
138                        logger.debug("Publish async request -> " + request.getId());
139                        publishAsyncRequest(request);
140                } else {
141                        logger.debug("Publish sync request -> " + request.getId());
142                        response = publishSyncRequest(request, method.getReturnType());
143                }
144
145                return response;
146        }
147
148        private void publishMessage(Request request, String replyQueueName) throws Exception {
149                String corrId = request.getId();
150
151                // Get the environment properties
152                String exchange;
153                String routingkey;
154
155                if (request.isMulti()) {
156                        exchange = multi + env.getProperty(ParameterQueue.RPC_EXCHANGE);
157                        routingkey = "";
158                } else {
159                        exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
160                        routingkey = uid;
161                }
162
163                // Add the correlation ID and create a replyTo property
164                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType).build();
165
166                // Publish the message
167                byte[] bytesRequest = serializer.serialize(serializerType, request);
168                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
169        }
170
171        private void publishAsyncRequest(Request request) throws Exception {
172                // Get the environment properties
173                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
174                publishMessage(request, replyQueueName);
175        }
176
177        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
178                String corrId = request.getId();
179
180                int retries = request.getRetries();
181                long timeout = request.getTimeout();
182
183                // Get the environment properties
184                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
185
186                // Publish the message
187                int i = 0;
188                while (i < retries) {
189                        try {
190                                publishMessage(request, replyQueueName);
191                                if (request.isMulti()) {
192                                        return getResults(corrId, 2, timeout, type);
193                                } else {
194                                        return getResult(corrId, timeout, type);
195                                }
196
197                        } catch (TimeoutException te) {
198                                logger.error(te);
199                        }
200                        i++;
201                }
202                throw new RetryException(retries, timeout);
203        }
204
205        private Request createRequest(Method method, Object[] arguments) {
206                String corrId = java.util.UUID.randomUUID().toString();
207                String methodName = method.getName();
208                boolean multi = false;
209                int wait = 0;
210
211                if (method.getAnnotation(MultiMethod.class) != null) {
212                        multi = true;
213                        wait = method.getAnnotation(MultiMethod.class).waitNum();
214                }
215
216                // Since we need to know whether the method is async and if it has to
217                // return using an annotation, we'll only check the AsyncMethod
218                // annotation
219                if (method.getAnnotation(AsyncMethod.class) == null) {
220                        int retries = 1;
221                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
222                        if (method.getAnnotation(SyncMethod.class) != null) {
223                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
224                                retries = sync.retry();
225                                timeout = sync.timeout();
226                        }
227                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi, wait);
228                } else {
229                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
230                }
231        }
232
233        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
234                Response resp = null;
235
236                // Wait for the results.
237                long localTimeout = timeout;
238                long start = System.currentTimeMillis();
239                synchronized (results) {
240                        // Due to we are using notifyAll(), we need to control the real time
241                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
242                                results.wait(localTimeout);
243                                localTimeout = System.currentTimeMillis() - start;
244                        }
245                        if ((timeout - localTimeout) <= 0) {
246                                throw new TimeoutException("Timeout exception time: " + timeout);
247                        }
248                        resp = serializer.deserializeResponse(results.get(corrId), type);
249
250                        // Remove and indicate the key exists (a hashmap can contain a null
251                        // object, using this we'll know whether a response has been
252                        // received before)
253                        results.put(corrId, null);
254                }
255
256                if (resp.getError() != null) {
257                        OmqException error = resp.getError();
258                        String name = error.getType();
259                        String message = error.getMessage();
260                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
261                }
262
263                return resp.getResult();
264        }
265
266        private Object getResults(String corrId, int wait, long timeout, Class<?> type) throws Exception {
267                Response resp = null;
268                Class<?> actualType = type.getComponentType();
269
270                Object array = Array.newInstance(actualType, wait);
271
272                int i = 0;
273                long localTimeout = timeout;
274                long start = System.currentTimeMillis();
275
276                while (i < wait) {
277                        synchronized (results) {
278                                // Due to we are using notifyAll(), we need to control the real
279                                // time
280                                while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
281                                        results.wait(localTimeout);
282                                        localTimeout = System.currentTimeMillis() - start;
283                                }
284                                if ((timeout - localTimeout) <= 0) {
285                                        throw new TimeoutException("Timeout exception time: " + timeout);
286                                }
287                                // Remove the corrId to receive new replies
288                                resp = serializer.deserializeResponse(results.remove(corrId), actualType);
289                                System.out.println("/n/n/n/n/nResult type: "+resp.getResult()+" /n/n/n/n/n");
290                                Array.set(array, i, resp.getResult());
291                        }
292                        i++;
293                }
294                synchronized (results) {
295                        results.put(corrId, null);
296                }
297
298                return array;
299        }
300
301        /**
302         *
303         * @param reference
304         *            RemoteObject reference
305         * @return true if the proxy has been created before or false in the other
306         *         case
307         */
308        public static boolean containsProxy(String reference) {
309                return proxies.containsKey(reference);
310        }
311
312        /**
313         *
314         * @param reference
315         *            RemoteObject reference
316         * @return a proxy instance
317         * @throws NoContainsInstanceException
318         */
319        public static Object getInstance(String reference) throws NoContainsInstanceException {
320                if (!containsProxy(reference)) {
321                        throw new NoContainsInstanceException(reference);
322                }
323                return proxies.get(reference);
324        }
325
326        /**
327         * Returns an instance of a proxy class for the specified interfaces that
328         * dispatches method invocations to the specified invocation handler. * @param
329         * loader
330         *
331         * @param loader
332         *            the class loader to define the proxy class
333         *
334         * @param interfaces
335         *            the list of interfaces for the proxy class to implement
336         * @param proxy
337         *            the invocation handler to dispatch method invocations to
338         * @return a proxy instance with the specified invocation handler of a proxy
339         *         class that is defined by the specified class loader and that
340         *         implements the specified interfaces
341         */
342        public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) {
343                if (proxies.containsKey(proxy.getRef())) {
344                        return proxies.get(proxy.getRef());
345                }
346                Object value = Proxy.newProxyInstance(loader, interfaces, proxy);
347                proxies.put(proxy.getRef(), value);
348                return value;
349        }
350
351        /**
352         * Gets the Map used internally to retreive the response of the server
353         *
354         * @return a map with all the keys processed. Every key is a correlation id
355         *         of a method invoked remotely
356         */
357        public Map<String, byte[]> getResults() {
358                return results;
359        }
360
361        public static void stopProxy() {
362                proxies = new HashMap<String, Object>();
363        }
364
365        public static Map<String, Object> getProxies() {
366                return proxies;
367        }
368
369        public static void setProxies(Map<String, Object> proxies) {
370                Proxymq.proxies = proxies;
371        }
372
373        @Override
374        public String getRef() {
375                return uid;
376        }
377
378        @Override
379        public void notifyEvent(Event event) throws IOException, SerializerException {
380        }
381
382        @Override
383        public void addListener(EventListener<?> eventListener) throws Exception {
384                if (eventListener.getTopic() == null) {
385                        eventListener.setTopic(uid);
386                }
387                listeners.put(eventListener.getTopic(), eventListener);
388                dispatcher.addListener(eventListener);
389        }
390
391        @Override
392        public void removeListener(EventListener<?> eventListener) throws Exception {
393                listeners.remove(eventListener.getTopic());
394                dispatcher.removeListener(eventListener);
395        }
396
397        @Override
398        public Collection<EventListener<?>> getListeners() throws Exception {
399                return listeners.values();
400        }
401
402}
Note: See TracBrowser for help on using the repository browser.