source: trunk/src/main/java/omq/client/proxy/Proxymq.java @ 70

Last change on this file since 70 was 70, checked in by stoda, 11 years ago

MultiProxymq? added

File size: 10.1 KB
Line 
1package omq.client.proxy;
2
3import java.io.IOException;
4import java.lang.reflect.Array;
5import java.lang.reflect.InvocationHandler;
6import java.lang.reflect.Method;
7import java.util.Collection;
8import java.util.HashMap;
9import java.util.Map;
10import java.util.Properties;
11
12import omq.Remote;
13import omq.client.annotation.AsyncMethod;
14import omq.client.annotation.MultiMethod;
15import omq.client.annotation.SyncMethod;
16import omq.client.listener.ResponseListener;
17import omq.common.broker.Broker;
18import omq.common.event.Event;
19import omq.common.event.EventDispatcher;
20import omq.common.event.EventListener;
21import omq.common.message.Request;
22import omq.common.message.Response;
23import omq.common.util.ParameterQueue;
24import omq.common.util.Serializer;
25import omq.exception.OmqException;
26import omq.exception.RetryException;
27import omq.exception.SerializerException;
28import omq.exception.TimeoutException;
29
30import org.apache.log4j.Logger;
31
32import com.rabbitmq.client.AMQP.BasicProperties;
33
34/**
35 * EvoProxy class. This class inherits from InvocationHandler and gives you a
36 * proxy with a server using an environment
37 *
38 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
39 *
40 */
41public class Proxymq implements InvocationHandler, Remote {
42
43        /**
44         *
45         */
46        private static final long serialVersionUID = 1L;
47        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
48        private static final String multi = "multi#";
49
50        private String uid;
51        private transient String exchange;
52        private transient String multiExchange;
53        private transient String replyQueueName;
54        private transient String serializerType;
55        private transient Broker broker;
56        private transient ResponseListener rListener;
57        private transient EventDispatcher dispatcher;
58        private transient Serializer serializer;
59        private transient Properties env;
60        private transient Map<String, byte[]> results;
61        private transient Map<String, EventListener<?>> listeners;
62
63        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
64        static {
65                primitiveClasses.put("byte", Byte.class);
66                primitiveClasses.put("short", Short.class);
67                primitiveClasses.put("char", Character.class);
68                primitiveClasses.put("int", Integer.class);
69                primitiveClasses.put("long", Long.class);
70                primitiveClasses.put("float", Float.class);
71                primitiveClasses.put("double", Double.class);
72        }
73
74        /**
75         * EvoProxy Constructor.
76         *
77         * This constructor uses an uid to know which object will call. It also uses
78         * Properties to set where to send the messages
79         *
80         * @param uid
81         *            The uid represents the unique identifier of a remote object
82         * @param clazz
83         *            It represents the real class of the remote object. With this
84         *            class the system can know the remoteInterface used and it can
85         *            also see which annotations are used
86         * @param env
87         *            The environment is used to know where to send the messages
88         * @throws Exception
89         */
90        public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception {
91                this.uid = uid;
92                this.broker = broker;
93                rListener = broker.getResponseListener();
94                dispatcher = broker.getEventDispatcher();
95                serializer = broker.getSerializer();
96
97                // TODO what is better to use a new channel or to use the same?
98                // this.channel = Broker.getChannel();
99                env = broker.getEnvironment();
100                exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
101                multiExchange = multi + exchange;
102                replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
103
104                // set the serializer type
105                serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA);
106
107                listeners = new HashMap<String, EventListener<?>>();
108
109                // Create a new hashmap and registry it in rListener
110                results = new HashMap<String, byte[]>();
111                rListener.registerProxy(this);
112        }
113
114        @Override
115        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
116                // Local methods only
117                String methodName = method.getName();
118
119                // The local methods will be invoked here
120                if (method.getDeclaringClass().equals(Remote.class)) {
121                        if (methodName.equals("getRef")) {
122                                return getRef();
123                        } else if (methodName.equals("addListener")) {
124                                addListener((EventListener<?>) arguments[0]);
125                                return null;
126                        } else if (methodName.equals("removeListener")) {
127                                removeListener((EventListener<?>) arguments[0]);
128                                return null;
129                        } else if (methodName.equals("getListeners")) {
130                                return getListeners();
131                        }
132                }
133
134                // Create the request
135                Request request = createRequest(method, arguments);
136
137                Object response = null;
138                // Publish the request
139                if (request.isAsync()) {
140                        logger.debug("Publish async request -> " + request.getId());
141                        publishMessage(request, replyQueueName);
142                } else {
143                        logger.debug("Publish sync request -> " + request.getId());
144                        response = publishSyncRequest(request, method.getReturnType());
145                }
146
147                return response;
148        }
149
150        private void publishMessage(Request request, String replyQueueName) throws Exception {
151                String corrId = request.getId();
152
153                // Get the environment properties
154                String exchange;
155                String routingkey;
156
157                if (request.isMulti()) {
158                        exchange = multiExchange;
159                        routingkey = "";
160                } else {
161                        exchange = this.exchange;
162                        routingkey = uid;
163                }
164
165                // Add the correlation ID and create a replyTo property
166                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType).build();
167
168                // Publish the message
169                byte[] bytesRequest = serializer.serialize(serializerType, request);
170                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
171        }
172
173        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
174                String corrId = request.getId();
175
176                int retries = request.getRetries();
177                long timeout = request.getTimeout();
178
179                // Publish the message
180                int i = 0;
181                while (i < retries) {
182                        try {
183                                publishMessage(request, replyQueueName);
184                                if (request.isMulti()) {
185                                        return getResults(corrId, 2, timeout, type);
186                                } else {
187                                        return getResult(corrId, timeout, type);
188                                }
189
190                        } catch (TimeoutException te) {
191                                logger.error(te);
192                        }
193                        i++;
194                }
195                throw new RetryException(retries, timeout);
196        }
197
198        private Request createRequest(Method method, Object[] arguments) {
199                String corrId = java.util.UUID.randomUUID().toString();
200                String methodName = method.getName();
201                boolean multi = false;
202                int wait = 0;
203
204                if (method.getAnnotation(MultiMethod.class) != null) {
205                        multi = true;
206                        wait = method.getAnnotation(MultiMethod.class).waitNum();
207                }
208
209                // Since we need to know whether the method is async and if it has to
210                // return using an annotation, we'll only check the AsyncMethod
211                // annotation
212                if (method.getAnnotation(AsyncMethod.class) == null) {
213                        int retries = 1;
214                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
215                        if (method.getAnnotation(SyncMethod.class) != null) {
216                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
217                                retries = sync.retry();
218                                timeout = sync.timeout();
219                        }
220                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi, wait);
221                } else {
222                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
223                }
224        }
225
226        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
227                Response resp = null;
228
229                // Wait for the results.
230                long localTimeout = timeout;
231                long start = System.currentTimeMillis();
232                synchronized (results) {
233                        // Due to we are using notifyAll(), we need to control the real time
234                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
235                                results.wait(localTimeout);
236                                localTimeout = System.currentTimeMillis() - start;
237                        }
238                        if ((timeout - localTimeout) <= 0) {
239                                throw new TimeoutException("Timeout exception time: " + timeout);
240                        }
241                        resp = serializer.deserializeResponse(results.get(corrId), type);
242
243                        // Remove and indicate the key exists (a hashmap can contain a null
244                        // object, using this we'll know whether a response has been
245                        // received before)
246                        results.put(corrId, null);
247                }
248
249                if (resp.getError() != null) {
250                        OmqException error = resp.getError();
251                        String name = error.getType();
252                        String message = error.getMessage();
253                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
254                }
255
256                return resp.getResult();
257        }
258
259        private Object getResults(String corrId, int wait, long timeout, Class<?> type) throws Exception {
260                Response resp = null;
261                Class<?> actualType = type.getComponentType();
262
263                Object array = Array.newInstance(actualType, wait);
264
265                int i = 0;
266                long localTimeout = timeout;
267                long start = System.currentTimeMillis();
268
269                while (i < wait) {
270                        synchronized (results) {
271                                // Due to we are using notifyAll(), we need to control the real
272                                // time
273                                while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
274                                        results.wait(localTimeout);
275                                        localTimeout = System.currentTimeMillis() - start;
276                                }
277                                if ((timeout - localTimeout) <= 0) {
278                                        throw new TimeoutException("Timeout exception time: " + timeout);
279                                }
280                                // Remove the corrId to receive new replies
281                                resp = serializer.deserializeResponse(results.remove(corrId), actualType);
282                                Array.set(array, i, resp.getResult());
283                        }
284                        i++;
285                }
286                synchronized (results) {
287                        results.put(corrId, null);
288                }
289
290                return array;
291        }
292
293        /**
294         * Gets the Map used internally to retreive the response of the server
295         *
296         * @return a map with all the keys processed. Every key is a correlation id
297         *         of a method invoked remotely
298         */
299        public Map<String, byte[]> getResults() {
300                return results;
301        }
302
303        @Override
304        public String getRef() {
305                return uid;
306        }
307
308        @Override
309        public void notifyEvent(Event event) throws IOException, SerializerException {
310        }
311
312        @Override
313        public void addListener(EventListener<?> eventListener) throws Exception {
314                if (eventListener.getTopic() == null) {
315                        eventListener.setTopic(uid);
316                }
317                listeners.put(eventListener.getTopic(), eventListener);
318                dispatcher.addListener(eventListener);
319        }
320
321        @Override
322        public void removeListener(EventListener<?> eventListener) throws Exception {
323                listeners.remove(eventListener.getTopic());
324                dispatcher.removeListener(eventListener);
325        }
326
327        @Override
328        public Collection<EventListener<?>> getListeners() throws Exception {
329                return listeners.values();
330        }
331
332}
Note: See TracBrowser for help on using the repository browser.