source: trunk/src/main/java/omq/client/proxy/Proxymq.java @ 54

Last change on this file since 54 was 54, checked in by stoda, 11 years ago

Adding @MultiMethod?
Broker is not a singleton.

File size: 11.0 KB
Line 
1package omq.client.proxy;
2
3import java.io.IOException;
4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
6import java.lang.reflect.Proxy;
7import java.util.Collection;
8import java.util.HashMap;
9import java.util.Hashtable;
10import java.util.Map;
11import java.util.Properties;
12
13import org.apache.log4j.Logger;
14
15import omq.Remote;
16import omq.client.annotation.AsyncMethod;
17import omq.client.annotation.MultiMethod;
18import omq.client.annotation.SyncMethod;
19import omq.client.listener.ResponseListener;
20import omq.common.broker.Broker;
21import omq.common.event.Event;
22import omq.common.event.EventDispatcher;
23import omq.common.event.EventListener;
24import omq.common.message.Request;
25import omq.common.message.Response;
26import omq.common.util.ParameterQueue;
27import omq.common.util.Serializer;
28import omq.exception.NoContainsInstanceException;
29import omq.exception.OmqException;
30import omq.exception.RetryException;
31import omq.exception.SerializerException;
32import omq.exception.TimeoutException;
33
34import com.rabbitmq.client.AMQP.BasicProperties;
35
36/**
37 * EvoProxy class. This class inherits from InvocationHandler and gives you a
38 * proxy with a server using an environment
39 *
40 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
41 *
42 */
43public class Proxymq implements InvocationHandler, Remote {
44
45        /**
46         *
47         */
48        private static final long serialVersionUID = 1L;
49        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
50        private static Map<String, Object> proxies = new Hashtable<String, Object>();
51
52        private String uid;
53        private transient String serializerType;
54        private transient Broker broker;
55        private transient ResponseListener rListener;
56        private transient EventDispatcher dispatcher;
57        private transient Serializer serializer;
58        // private transient Channel channel;
59        private transient Properties env;
60        private transient Map<String, byte[]> results;
61        private transient Map<String, EventListener<?>> listeners;
62
63        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
64        static {
65                primitiveClasses.put("byte", Byte.class);
66                primitiveClasses.put("short", Short.class);
67                primitiveClasses.put("char", Character.class);
68                primitiveClasses.put("int", Integer.class);
69                primitiveClasses.put("long", Long.class);
70                primitiveClasses.put("float", Float.class);
71                primitiveClasses.put("double", Double.class);
72        }
73
74        /**
75         * EvoProxy Constructor.
76         *
77         * This constructor uses an uid to know which object will call. It also uses
78         * Properties to set where to send the messages
79         *
80         * @param uid
81         *            The uid represents the unique identifier of a remote object
82         * @param clazz
83         *            It represents the real class of the remote object. With this
84         *            class the system can know the remoteInterface used and it can
85         *            also see which annotations are used
86         * @param env
87         *            The environment is used to know where to send the messages
88         * @throws Exception
89         */
90        public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception {
91                this.uid = uid;
92                this.broker = broker;
93                rListener = broker.getResponseListener();
94                dispatcher = broker.getEventDispatcher();
95                serializer = broker.getSerializer();
96
97                // TODO what is better to use a new channel or to use the same?
98                // this.channel = Broker.getChannel();
99                env = broker.getEnvironment();
100
101                // set the serializer type
102                serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java);
103
104                listeners = new HashMap<String, EventListener<?>>();
105
106                // Create a new hashmap and registry it in rListener
107                results = new HashMap<String, byte[]>();
108                rListener.registerProxy(this);
109        }
110
111        @Override
112        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
113                // long timeStart = (new Date()).getTime();
114
115                // Local methods only
116                String methodName = method.getName();
117
118                // The local methods will be invoked here
119                if (method.getDeclaringClass().equals(Remote.class)) {
120                        if (methodName.equals("getRef")) {
121                                return getRef();
122                        } else if (methodName.equals("addListener")) {
123                                addListener((EventListener<?>) arguments[0]);
124                                return null;
125                        } else if (methodName.equals("removeListener")) {
126                                removeListener((EventListener<?>) arguments[0]);
127                                return null;
128                        } else if (methodName.equals("getListeners")) {
129                                return getListeners();
130                        }
131                }
132
133                // Create the request
134                Request request = createRequest(method, arguments);
135
136                Object response = null;
137                // Publish the request
138                if (request.isAsync()) {
139                        logger.debug("Publish async request -> " + request.getId());
140                        publishAsyncRequest(request);
141                } else {
142                        logger.debug("Publish sync request -> " + request.getId());
143                        response = publishSyncRequest(request, method.getReturnType());
144                }
145
146                return response;
147        }
148
149        private void publishMessage(Request request, String replyQueueName) throws Exception {
150                String corrId = request.getId();
151
152                // Get the environment properties
153                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
154                String routingkey = this.uid;
155
156                // Add the correlation ID and create a replyTo property
157                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType).build();
158
159                // Publish the message
160                byte[] bytesRequest = serializer.serialize(serializerType, request);
161                // TODO See this
162                // channel.basicPublish(exchange, routingkey, props, bytesRequest);
163                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
164        }
165
166        private void publishAsyncRequest(Request request) throws Exception {
167                // Get the environment properties
168                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
169                publishMessage(request, replyQueueName);
170        }
171
172        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
173                String corrId = request.getId();
174
175                int retries = request.getRetries();
176                long timeout = request.getTimeout();
177
178                // Get the environment properties
179                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
180
181                // Publish the message
182                int i = 0;
183                while (i < retries) {
184                        try {
185                                publishMessage(request, replyQueueName);
186                                return getResult(corrId, timeout, type);
187                        } catch (TimeoutException te) {
188                                logger.error(te);
189                        }
190                        i++;
191                }
192                throw new RetryException(retries, timeout);
193        }
194
195        private Request createRequest(Method method, Object[] arguments) {
196                String corrId = java.util.UUID.randomUUID().toString();
197                String methodName = method.getName();
198                boolean multi = false;
199
200                if (method.getAnnotation(MultiMethod.class) != null) {
201                        multi = true;
202                }
203
204                // Since we need to know whether the method is async and if it has to
205                // return using an annotation, we'll only check the AsyncMethod
206                // annotation
207                if (method.getAnnotation(AsyncMethod.class) == null) {
208                        int retries = 1;
209                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
210                        if (method.getAnnotation(SyncMethod.class) != null) {
211                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
212                                retries = sync.retry();
213                                timeout = sync.timeout();
214                        }
215                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout);
216                } else {
217                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
218                }
219        }
220
221        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
222                Response resp = null;
223
224                // Wait for the results.
225                long localTimeout = 0;
226                long start = System.currentTimeMillis();
227                synchronized (results) {
228                        // Due to we are using notifyAll(), we need to control the real time
229                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
230                                results.wait(timeout);
231                                localTimeout = System.currentTimeMillis() - start;
232                        }
233                        if ((timeout - localTimeout) <= 0) {
234                                throw new TimeoutException("Timeout exception time: " + timeout);
235                        }
236                        resp = serializer.deserializeResponse(results.get(corrId), type);
237
238                        // Remove and indicate the key exists (a hashmap can contain a null
239                        // object, using this we'll know whether a response has been
240                        // received before)
241                        results.put(corrId, null);
242                }
243
244                if (resp.getError() != null) {
245                        OmqException error = resp.getError();
246                        String name = error.getType();
247                        String message = error.getMessage();
248                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
249                }
250
251                return resp.getResult();
252        }
253
254        /**
255         *
256         * @param reference
257         *            RemoteObject reference
258         * @return true if the proxy has been created before or false in the other
259         *         case
260         */
261        public static boolean containsProxy(String reference) {
262                return proxies.containsKey(reference);
263        }
264
265        /**
266         *
267         * @param reference
268         *            RemoteObject reference
269         * @return a proxy instance
270         * @throws NoContainsInstanceException
271         */
272        public static Object getInstance(String reference) throws NoContainsInstanceException {
273                if (!containsProxy(reference)) {
274                        throw new NoContainsInstanceException(reference);
275                }
276                return proxies.get(reference);
277        }
278
279        /**
280         * Returns an instance of a proxy class for the specified interfaces that
281         * dispatches method invocations to the specified invocation handler. * @param
282         * loader
283         *
284         * @param loader
285         *            the class loader to define the proxy class
286         *
287         * @param interfaces
288         *            the list of interfaces for the proxy class to implement
289         * @param proxy
290         *            the invocation handler to dispatch method invocations to
291         * @return a proxy instance with the specified invocation handler of a proxy
292         *         class that is defined by the specified class loader and that
293         *         implements the specified interfaces
294         */
295        public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) {
296                if (proxies.containsKey(proxy.getRef())) {
297                        return proxies.get(proxy.getRef());
298                }
299                Object value = Proxy.newProxyInstance(loader, interfaces, proxy);
300                proxies.put(proxy.getRef(), value);
301                return value;
302        }
303
304        /**
305         * Gets the Map used internally to retreive the response of the server
306         *
307         * @return a map with all the keys processed. Every key is a correlation id
308         *         of a method invoked remotely
309         */
310        public Map<String, byte[]> getResults() {
311                return results;
312        }
313
314        public static void stopProxy() {
315                proxies = new HashMap<String, Object>();
316        }
317
318        public static Map<String, Object> getProxies() {
319                return proxies;
320        }
321
322        public static void setProxies(Map<String, Object> proxies) {
323                Proxymq.proxies = proxies;
324        }
325
326        @Override
327        public String getRef() {
328                return uid;
329        }
330
331        @Override
332        public void notifyEvent(Event event) throws IOException, SerializerException {
333        }
334
335        @Override
336        public void addListener(EventListener<?> eventListener) throws Exception {
337                if (eventListener.getTopic() == null) {
338                        eventListener.setTopic(uid);
339                }
340                listeners.put(eventListener.getTopic(), eventListener);
341                dispatcher.addListener(eventListener);
342        }
343
344        @Override
345        public void removeListener(EventListener<?> eventListener) throws Exception {
346                listeners.remove(eventListener.getTopic());
347                dispatcher.removeListener(eventListener);
348        }
349
350        @Override
351        public Collection<EventListener<?>> getListeners() throws Exception {
352                return listeners.values();
353        }
354
355}
Note: See TracBrowser for help on using the repository browser.