source: trunk/src/main/java/omq/client/proxy/Proxymq.java @ 56

Last change on this file since 56 was 55, checked in by stoda, 11 years ago

@MultiMethod? implemented and working with @AsyncMethod? annotation.
TODO: @Multi with SyncMethod?(waitNum = x) -> Must return a List<?>
Refactoring in Proxymq if it's necessary

File size: 11.0 KB
Line 
1package omq.client.proxy;
2
3import java.io.IOException;
4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
6import java.lang.reflect.Proxy;
7import java.util.Collection;
8import java.util.HashMap;
9import java.util.Hashtable;
10import java.util.Map;
11import java.util.Properties;
12
13import org.apache.log4j.Logger;
14
15import omq.Remote;
16import omq.client.annotation.AsyncMethod;
17import omq.client.annotation.MultiMethod;
18import omq.client.annotation.SyncMethod;
19import omq.client.listener.ResponseListener;
20import omq.common.broker.Broker;
21import omq.common.event.Event;
22import omq.common.event.EventDispatcher;
23import omq.common.event.EventListener;
24import omq.common.message.Request;
25import omq.common.message.Response;
26import omq.common.util.ParameterQueue;
27import omq.common.util.Serializer;
28import omq.exception.NoContainsInstanceException;
29import omq.exception.OmqException;
30import omq.exception.RetryException;
31import omq.exception.SerializerException;
32import omq.exception.TimeoutException;
33
34import com.rabbitmq.client.AMQP.BasicProperties;
35
36/**
37 * EvoProxy class. This class inherits from InvocationHandler and gives you a
38 * proxy with a server using an environment
39 *
40 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
41 *
42 */
43public class Proxymq implements InvocationHandler, Remote {
44
45        /**
46         *
47         */
48        private static final long serialVersionUID = 1L;
49        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
50        private static final String multi = "multi#";
51        private static Map<String, Object> proxies = new Hashtable<String, Object>();
52
53        private String uid;
54        private transient String serializerType;
55        private transient Broker broker;
56        private transient ResponseListener rListener;
57        private transient EventDispatcher dispatcher;
58        private transient Serializer serializer;
59        private transient Properties env;
60        private transient Map<String, byte[]> results;
61        private transient Map<String, EventListener<?>> listeners;
62
63        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
64        static {
65                primitiveClasses.put("byte", Byte.class);
66                primitiveClasses.put("short", Short.class);
67                primitiveClasses.put("char", Character.class);
68                primitiveClasses.put("int", Integer.class);
69                primitiveClasses.put("long", Long.class);
70                primitiveClasses.put("float", Float.class);
71                primitiveClasses.put("double", Double.class);
72        }
73
74        /**
75         * EvoProxy Constructor.
76         *
77         * This constructor uses an uid to know which object will call. It also uses
78         * Properties to set where to send the messages
79         *
80         * @param uid
81         *            The uid represents the unique identifier of a remote object
82         * @param clazz
83         *            It represents the real class of the remote object. With this
84         *            class the system can know the remoteInterface used and it can
85         *            also see which annotations are used
86         * @param env
87         *            The environment is used to know where to send the messages
88         * @throws Exception
89         */
90        public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception {
91                this.uid = uid;
92                this.broker = broker;
93                rListener = broker.getResponseListener();
94                dispatcher = broker.getEventDispatcher();
95                serializer = broker.getSerializer();
96
97                // TODO what is better to use a new channel or to use the same?
98                // this.channel = Broker.getChannel();
99                env = broker.getEnvironment();
100
101                // set the serializer type
102                serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java);
103
104                listeners = new HashMap<String, EventListener<?>>();
105
106                // Create a new hashmap and registry it in rListener
107                results = new HashMap<String, byte[]>();
108                rListener.registerProxy(this);
109        }
110
111        @Override
112        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
113                // Local methods only
114                String methodName = method.getName();
115
116                // The local methods will be invoked here
117                if (method.getDeclaringClass().equals(Remote.class)) {
118                        if (methodName.equals("getRef")) {
119                                return getRef();
120                        } else if (methodName.equals("addListener")) {
121                                addListener((EventListener<?>) arguments[0]);
122                                return null;
123                        } else if (methodName.equals("removeListener")) {
124                                removeListener((EventListener<?>) arguments[0]);
125                                return null;
126                        } else if (methodName.equals("getListeners")) {
127                                return getListeners();
128                        }
129                }
130
131                // Create the request
132                Request request = createRequest(method, arguments);
133
134                Object response = null;
135                // Publish the request
136                if (request.isAsync()) {
137                        logger.debug("Publish async request -> " + request.getId());
138                        publishAsyncRequest(request);
139                } else {
140                        logger.debug("Publish sync request -> " + request.getId());
141                        response = publishSyncRequest(request, method.getReturnType());
142                }
143
144                return response;
145        }
146
147        private void publishMessage(Request request, String replyQueueName) throws Exception {
148                String corrId = request.getId();
149
150                // Get the environment properties
151                String exchange;
152                String routingkey;
153
154                if (request.isMulti()) {
155                        exchange = multi + env.getProperty(ParameterQueue.RPC_EXCHANGE);
156                        routingkey = "";
157                } else {
158                        exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
159                        routingkey = uid;
160                }
161
162                // Add the correlation ID and create a replyTo property
163                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType).build();
164
165                // Publish the message
166                byte[] bytesRequest = serializer.serialize(serializerType, request);
167                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
168        }
169
170        private void publishAsyncRequest(Request request) throws Exception {
171                // Get the environment properties
172                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
173                publishMessage(request, replyQueueName);
174        }
175
176        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
177                String corrId = request.getId();
178
179                int retries = request.getRetries();
180                long timeout = request.getTimeout();
181
182                // Get the environment properties
183                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
184
185                // Publish the message
186                int i = 0;
187                while (i < retries) {
188                        try {
189                                publishMessage(request, replyQueueName);
190                                return getResult(corrId, timeout, type);
191                        } catch (TimeoutException te) {
192                                logger.error(te);
193                        }
194                        i++;
195                }
196                throw new RetryException(retries, timeout);
197        }
198
199        private Request createRequest(Method method, Object[] arguments) {
200                String corrId = java.util.UUID.randomUUID().toString();
201                String methodName = method.getName();
202                boolean multi = false;
203
204                if (method.getAnnotation(MultiMethod.class) != null) {
205                        multi = true;
206                }
207
208                // Since we need to know whether the method is async and if it has to
209                // return using an annotation, we'll only check the AsyncMethod
210                // annotation
211                if (method.getAnnotation(AsyncMethod.class) == null) {
212                        int retries = 1;
213                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
214                        if (method.getAnnotation(SyncMethod.class) != null) {
215                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
216                                retries = sync.retry();
217                                timeout = sync.timeout();
218                        }
219                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi);
220                } else {
221                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
222                }
223        }
224
225        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
226                Response resp = null;
227
228                // Wait for the results.
229                long localTimeout = 0;
230                long start = System.currentTimeMillis();
231                synchronized (results) {
232                        // Due to we are using notifyAll(), we need to control the real time
233                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
234                                results.wait(timeout);
235                                localTimeout = System.currentTimeMillis() - start;
236                        }
237                        if ((timeout - localTimeout) <= 0) {
238                                throw new TimeoutException("Timeout exception time: " + timeout);
239                        }
240                        resp = serializer.deserializeResponse(results.get(corrId), type);
241
242                        // Remove and indicate the key exists (a hashmap can contain a null
243                        // object, using this we'll know whether a response has been
244                        // received before)
245                        results.put(corrId, null);
246                }
247
248                if (resp.getError() != null) {
249                        OmqException error = resp.getError();
250                        String name = error.getType();
251                        String message = error.getMessage();
252                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
253                }
254
255                return resp.getResult();
256        }
257
258        /**
259         *
260         * @param reference
261         *            RemoteObject reference
262         * @return true if the proxy has been created before or false in the other
263         *         case
264         */
265        public static boolean containsProxy(String reference) {
266                return proxies.containsKey(reference);
267        }
268
269        /**
270         *
271         * @param reference
272         *            RemoteObject reference
273         * @return a proxy instance
274         * @throws NoContainsInstanceException
275         */
276        public static Object getInstance(String reference) throws NoContainsInstanceException {
277                if (!containsProxy(reference)) {
278                        throw new NoContainsInstanceException(reference);
279                }
280                return proxies.get(reference);
281        }
282
283        /**
284         * Returns an instance of a proxy class for the specified interfaces that
285         * dispatches method invocations to the specified invocation handler. * @param
286         * loader
287         *
288         * @param loader
289         *            the class loader to define the proxy class
290         *
291         * @param interfaces
292         *            the list of interfaces for the proxy class to implement
293         * @param proxy
294         *            the invocation handler to dispatch method invocations to
295         * @return a proxy instance with the specified invocation handler of a proxy
296         *         class that is defined by the specified class loader and that
297         *         implements the specified interfaces
298         */
299        public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) {
300                if (proxies.containsKey(proxy.getRef())) {
301                        return proxies.get(proxy.getRef());
302                }
303                Object value = Proxy.newProxyInstance(loader, interfaces, proxy);
304                proxies.put(proxy.getRef(), value);
305                return value;
306        }
307
308        /**
309         * Gets the Map used internally to retreive the response of the server
310         *
311         * @return a map with all the keys processed. Every key is a correlation id
312         *         of a method invoked remotely
313         */
314        public Map<String, byte[]> getResults() {
315                return results;
316        }
317
318        public static void stopProxy() {
319                proxies = new HashMap<String, Object>();
320        }
321
322        public static Map<String, Object> getProxies() {
323                return proxies;
324        }
325
326        public static void setProxies(Map<String, Object> proxies) {
327                Proxymq.proxies = proxies;
328        }
329
330        @Override
331        public String getRef() {
332                return uid;
333        }
334
335        @Override
336        public void notifyEvent(Event event) throws IOException, SerializerException {
337        }
338
339        @Override
340        public void addListener(EventListener<?> eventListener) throws Exception {
341                if (eventListener.getTopic() == null) {
342                        eventListener.setTopic(uid);
343                }
344                listeners.put(eventListener.getTopic(), eventListener);
345                dispatcher.addListener(eventListener);
346        }
347
348        @Override
349        public void removeListener(EventListener<?> eventListener) throws Exception {
350                listeners.remove(eventListener.getTopic());
351                dispatcher.removeListener(eventListener);
352        }
353
354        @Override
355        public Collection<EventListener<?>> getListeners() throws Exception {
356                return listeners.values();
357        }
358
359}
Note: See TracBrowser for help on using the repository browser.