source: trunk/src/main/java/omq/client/proxy/Proxymq.java @ 70

Last change on this file since 70 was 70, checked in by stoda, 11 years ago

MultiProxymq? added

File size: 10.1 KB
RevLine 
[44]1package omq.client.proxy;
2
3import java.io.IOException;
[58]4import java.lang.reflect.Array;
[44]5import java.lang.reflect.InvocationHandler;
6import java.lang.reflect.Method;
7import java.util.Collection;
8import java.util.HashMap;
9import java.util.Map;
10import java.util.Properties;
11
12import omq.Remote;
13import omq.client.annotation.AsyncMethod;
[54]14import omq.client.annotation.MultiMethod;
[44]15import omq.client.annotation.SyncMethod;
16import omq.client.listener.ResponseListener;
17import omq.common.broker.Broker;
18import omq.common.event.Event;
19import omq.common.event.EventDispatcher;
20import omq.common.event.EventListener;
21import omq.common.message.Request;
22import omq.common.message.Response;
23import omq.common.util.ParameterQueue;
24import omq.common.util.Serializer;
25import omq.exception.OmqException;
26import omq.exception.RetryException;
27import omq.exception.SerializerException;
28import omq.exception.TimeoutException;
29
[58]30import org.apache.log4j.Logger;
31
[44]32import com.rabbitmq.client.AMQP.BasicProperties;
33
34/**
35 * EvoProxy class. This class inherits from InvocationHandler and gives you a
36 * proxy with a server using an environment
37 *
38 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
39 *
40 */
41public class Proxymq implements InvocationHandler, Remote {
42
43        /**
44         *
45         */
46        private static final long serialVersionUID = 1L;
[49]47        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
[55]48        private static final String multi = "multi#";
[44]49
50        private String uid;
[70]51        private transient String exchange;
52        private transient String multiExchange;
53        private transient String replyQueueName;
[47]54        private transient String serializerType;
[53]55        private transient Broker broker;
[44]56        private transient ResponseListener rListener;
57        private transient EventDispatcher dispatcher;
[53]58        private transient Serializer serializer;
[44]59        private transient Properties env;
60        private transient Map<String, byte[]> results;
61        private transient Map<String, EventListener<?>> listeners;
62
63        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
64        static {
65                primitiveClasses.put("byte", Byte.class);
66                primitiveClasses.put("short", Short.class);
67                primitiveClasses.put("char", Character.class);
68                primitiveClasses.put("int", Integer.class);
69                primitiveClasses.put("long", Long.class);
70                primitiveClasses.put("float", Float.class);
71                primitiveClasses.put("double", Double.class);
72        }
73
74        /**
75         * EvoProxy Constructor.
76         *
77         * This constructor uses an uid to know which object will call. It also uses
78         * Properties to set where to send the messages
79         *
80         * @param uid
81         *            The uid represents the unique identifier of a remote object
82         * @param clazz
83         *            It represents the real class of the remote object. With this
84         *            class the system can know the remoteInterface used and it can
85         *            also see which annotations are used
86         * @param env
87         *            The environment is used to know where to send the messages
88         * @throws Exception
89         */
[53]90        public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception {
[44]91                this.uid = uid;
[53]92                this.broker = broker;
93                rListener = broker.getResponseListener();
94                dispatcher = broker.getEventDispatcher();
95                serializer = broker.getSerializer();
[44]96
97                // TODO what is better to use a new channel or to use the same?
98                // this.channel = Broker.getChannel();
[53]99                env = broker.getEnvironment();
[70]100                exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
101                multiExchange = multi + exchange;
102                replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
[44]103
[47]104                // set the serializer type
[62]105                serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA);
[47]106
[44]107                listeners = new HashMap<String, EventListener<?>>();
108
109                // Create a new hashmap and registry it in rListener
110                results = new HashMap<String, byte[]>();
111                rListener.registerProxy(this);
112        }
113
114        @Override
115        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
116                // Local methods only
117                String methodName = method.getName();
118
119                // The local methods will be invoked here
120                if (method.getDeclaringClass().equals(Remote.class)) {
121                        if (methodName.equals("getRef")) {
122                                return getRef();
123                        } else if (methodName.equals("addListener")) {
124                                addListener((EventListener<?>) arguments[0]);
125                                return null;
126                        } else if (methodName.equals("removeListener")) {
127                                removeListener((EventListener<?>) arguments[0]);
128                                return null;
129                        } else if (methodName.equals("getListeners")) {
130                                return getListeners();
131                        }
132                }
133
134                // Create the request
135                Request request = createRequest(method, arguments);
136
137                Object response = null;
138                // Publish the request
139                if (request.isAsync()) {
[49]140                        logger.debug("Publish async request -> " + request.getId());
[70]141                        publishMessage(request, replyQueueName);
[44]142                } else {
[49]143                        logger.debug("Publish sync request -> " + request.getId());
[44]144                        response = publishSyncRequest(request, method.getReturnType());
145                }
146
147                return response;
148        }
149
150        private void publishMessage(Request request, String replyQueueName) throws Exception {
151                String corrId = request.getId();
152
153                // Get the environment properties
[55]154                String exchange;
155                String routingkey;
[44]156
[55]157                if (request.isMulti()) {
[70]158                        exchange = multiExchange;
[55]159                        routingkey = "";
160                } else {
[70]161                        exchange = this.exchange;
[55]162                        routingkey = uid;
163                }
164
[44]165                // Add the correlation ID and create a replyTo property
[47]166                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType).build();
[44]167
168                // Publish the message
[53]169                byte[] bytesRequest = serializer.serialize(serializerType, request);
170                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
[44]171        }
172
173        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
174                String corrId = request.getId();
175
176                int retries = request.getRetries();
177                long timeout = request.getTimeout();
178
179                // Publish the message
180                int i = 0;
181                while (i < retries) {
182                        try {
183                                publishMessage(request, replyQueueName);
[58]184                                if (request.isMulti()) {
185                                        return getResults(corrId, 2, timeout, type);
186                                } else {
187                                        return getResult(corrId, timeout, type);
188                                }
189
[44]190                        } catch (TimeoutException te) {
[49]191                                logger.error(te);
[44]192                        }
193                        i++;
194                }
195                throw new RetryException(retries, timeout);
196        }
197
198        private Request createRequest(Method method, Object[] arguments) {
199                String corrId = java.util.UUID.randomUUID().toString();
200                String methodName = method.getName();
[54]201                boolean multi = false;
[58]202                int wait = 0;
[44]203
[54]204                if (method.getAnnotation(MultiMethod.class) != null) {
205                        multi = true;
[58]206                        wait = method.getAnnotation(MultiMethod.class).waitNum();
[54]207                }
208
[44]209                // Since we need to know whether the method is async and if it has to
210                // return using an annotation, we'll only check the AsyncMethod
211                // annotation
212                if (method.getAnnotation(AsyncMethod.class) == null) {
213                        int retries = 1;
214                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
215                        if (method.getAnnotation(SyncMethod.class) != null) {
216                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
217                                retries = sync.retry();
218                                timeout = sync.timeout();
219                        }
[58]220                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi, wait);
[44]221                } else {
[54]222                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
[44]223                }
224        }
225
226        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
227                Response resp = null;
228
229                // Wait for the results.
[58]230                long localTimeout = timeout;
[44]231                long start = System.currentTimeMillis();
232                synchronized (results) {
233                        // Due to we are using notifyAll(), we need to control the real time
234                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
[58]235                                results.wait(localTimeout);
[44]236                                localTimeout = System.currentTimeMillis() - start;
237                        }
238                        if ((timeout - localTimeout) <= 0) {
239                                throw new TimeoutException("Timeout exception time: " + timeout);
240                        }
[53]241                        resp = serializer.deserializeResponse(results.get(corrId), type);
[44]242
243                        // Remove and indicate the key exists (a hashmap can contain a null
244                        // object, using this we'll know whether a response has been
245                        // received before)
246                        results.put(corrId, null);
247                }
248
249                if (resp.getError() != null) {
250                        OmqException error = resp.getError();
251                        String name = error.getType();
252                        String message = error.getMessage();
253                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
254                }
255
256                return resp.getResult();
257        }
258
[57]259        private Object getResults(String corrId, int wait, long timeout, Class<?> type) throws Exception {
260                Response resp = null;
[58]261                Class<?> actualType = type.getComponentType();
[57]262
[58]263                Object array = Array.newInstance(actualType, wait);
264
[57]265                int i = 0;
266                long localTimeout = timeout;
267                long start = System.currentTimeMillis();
268
269                while (i < wait) {
270                        synchronized (results) {
271                                // Due to we are using notifyAll(), we need to control the real
272                                // time
273                                while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
274                                        results.wait(localTimeout);
275                                        localTimeout = System.currentTimeMillis() - start;
276                                }
277                                if ((timeout - localTimeout) <= 0) {
278                                        throw new TimeoutException("Timeout exception time: " + timeout);
279                                }
280                                // Remove the corrId to receive new replies
[58]281                                resp = serializer.deserializeResponse(results.remove(corrId), actualType);
282                                Array.set(array, i, resp.getResult());
[57]283                        }
284                        i++;
285                }
286                synchronized (results) {
287                        results.put(corrId, null);
288                }
289
[58]290                return array;
[57]291        }
292
[44]293        /**
294         * Gets the Map used internally to retreive the response of the server
295         *
296         * @return a map with all the keys processed. Every key is a correlation id
297         *         of a method invoked remotely
298         */
299        public Map<String, byte[]> getResults() {
300                return results;
301        }
302
303        @Override
304        public String getRef() {
305                return uid;
306        }
307
308        @Override
309        public void notifyEvent(Event event) throws IOException, SerializerException {
310        }
311
312        @Override
313        public void addListener(EventListener<?> eventListener) throws Exception {
314                if (eventListener.getTopic() == null) {
315                        eventListener.setTopic(uid);
316                }
317                listeners.put(eventListener.getTopic(), eventListener);
318                dispatcher.addListener(eventListener);
319        }
320
321        @Override
322        public void removeListener(EventListener<?> eventListener) throws Exception {
323                listeners.remove(eventListener.getTopic());
324                dispatcher.removeListener(eventListener);
325        }
326
327        @Override
328        public Collection<EventListener<?>> getListeners() throws Exception {
329                return listeners.values();
330        }
331
332}
Note: See TracBrowser for help on using the repository browser.