source: trunk/src/main/java/omq/client/proxy/Proxymq.java @ 72

Last change on this file since 72 was 72, checked in by stoda, 11 years ago

Events deleted instead of them there's a new example of how to use the observer pattern

File size: 8.7 KB
Line 
1package omq.client.proxy;
2
3import java.lang.reflect.Array;
4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
6import java.util.HashMap;
7import java.util.Map;
8import java.util.Properties;
9
10import omq.Remote;
11import omq.client.annotation.AsyncMethod;
12import omq.client.annotation.MultiMethod;
13import omq.client.annotation.SyncMethod;
14import omq.client.listener.ResponseListener;
15import omq.common.broker.Broker;
16import omq.common.message.Request;
17import omq.common.message.Response;
18import omq.common.util.ParameterQueue;
19import omq.common.util.Serializer;
20import omq.exception.OmqException;
21import omq.exception.RetryException;
22import omq.exception.TimeoutException;
23
24import org.apache.log4j.Logger;
25
26import com.rabbitmq.client.AMQP.BasicProperties;
27
28/**
29 * EvoProxy class. This class inherits from InvocationHandler and gives you a
30 * proxy with a server using an environment
31 *
32 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
33 *
34 */
35public class Proxymq implements InvocationHandler, Remote {
36
37        /**
38         *
39         */
40        private static final long serialVersionUID = 1L;
41        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
42        private static final String multi = "multi#";
43
44        private String uid;
45        private transient String exchange;
46        private transient String multiExchange;
47        private transient String replyQueueName;
48        private transient String serializerType;
49        private transient Broker broker;
50        private transient ResponseListener rListener;
51        private transient Serializer serializer;
52        private transient Properties env;
53        private transient Map<String, byte[]> results;
54
55        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
56        static {
57                primitiveClasses.put("byte", Byte.class);
58                primitiveClasses.put("short", Short.class);
59                primitiveClasses.put("char", Character.class);
60                primitiveClasses.put("int", Integer.class);
61                primitiveClasses.put("long", Long.class);
62                primitiveClasses.put("float", Float.class);
63                primitiveClasses.put("double", Double.class);
64        }
65
66        /**
67         * EvoProxy Constructor.
68         *
69         * This constructor uses an uid to know which object will call. It also uses
70         * Properties to set where to send the messages
71         *
72         * @param uid
73         *            The uid represents the unique identifier of a remote object
74         * @param clazz
75         *            It represents the real class of the remote object. With this
76         *            class the system can know the remoteInterface used and it can
77         *            also see which annotations are used
78         * @param env
79         *            The environment is used to know where to send the messages
80         * @throws Exception
81         */
82        public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception {
83                this.uid = uid;
84                this.broker = broker;
85                rListener = broker.getResponseListener();
86                serializer = broker.getSerializer();
87
88                // TODO what is better to use a new channel or to use the same?
89                // this.channel = Broker.getChannel();
90                env = broker.getEnvironment();
91                exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
92                multiExchange = multi + exchange;
93                replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
94
95                // set the serializer type
96                serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA);
97
98                // Create a new hashmap and registry it in rListener
99                results = new HashMap<String, byte[]>();
100                rListener.registerProxy(this);
101        }
102
103        @Override
104        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
105                // Local methods only
106                String methodName = method.getName();
107
108                // The local methods will be invoked here
109                if (method.getDeclaringClass().equals(Remote.class)) {
110                        if (methodName.equals("getRef")) {
111                                return getRef();
112                        }
113                }
114
115                // Create the request
116                Request request = createRequest(method, arguments);
117
118                Object response = null;
119                // Publish the request
120                if (request.isAsync()) {
121                        logger.debug("Publish async request -> " + request.getId());
122                        publishMessage(request, replyQueueName);
123                } else {
124                        logger.debug("Publish sync request -> " + request.getId());
125                        response = publishSyncRequest(request, method.getReturnType());
126                }
127
128                return response;
129        }
130
131        private void publishMessage(Request request, String replyQueueName) throws Exception {
132                String corrId = request.getId();
133
134                // Get the environment properties
135                String exchange;
136                String routingkey;
137
138                if (request.isMulti()) {
139                        exchange = multiExchange;
140                        routingkey = "";
141                } else {
142                        exchange = this.exchange;
143                        routingkey = uid;
144                }
145
146                // Add the correlation ID and create a replyTo property
147                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType).build();
148
149                // Publish the message
150                byte[] bytesRequest = serializer.serialize(serializerType, request);
151                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
152        }
153
154        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
155                String corrId = request.getId();
156
157                int retries = request.getRetries();
158                long timeout = request.getTimeout();
159
160                // Publish the message
161                int i = 0;
162                while (i < retries) {
163                        try {
164                                publishMessage(request, replyQueueName);
165                                if (request.isMulti()) {
166                                        return getResults(corrId, 2, timeout, type);
167                                } else {
168                                        return getResult(corrId, timeout, type);
169                                }
170
171                        } catch (TimeoutException te) {
172                                logger.error(te);
173                        }
174                        i++;
175                }
176                throw new RetryException(retries, timeout);
177        }
178
179        private Request createRequest(Method method, Object[] arguments) {
180                String corrId = java.util.UUID.randomUUID().toString();
181                String methodName = method.getName();
182                boolean multi = false;
183                int wait = 0;
184
185                if (method.getAnnotation(MultiMethod.class) != null) {
186                        multi = true;
187                        wait = method.getAnnotation(MultiMethod.class).waitNum();
188                }
189
190                // Since we need to know whether the method is async and if it has to
191                // return using an annotation, we'll only check the AsyncMethod
192                // annotation
193                if (method.getAnnotation(AsyncMethod.class) == null) {
194                        int retries = 1;
195                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
196                        if (method.getAnnotation(SyncMethod.class) != null) {
197                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
198                                retries = sync.retry();
199                                timeout = sync.timeout();
200                        }
201                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi, wait);
202                } else {
203                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
204                }
205        }
206
207        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
208                Response resp = null;
209
210                // Wait for the results.
211                long localTimeout = timeout;
212                long start = System.currentTimeMillis();
213                synchronized (results) {
214                        // Due to we are using notifyAll(), we need to control the real time
215                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
216                                results.wait(localTimeout);
217                                localTimeout = System.currentTimeMillis() - start;
218                        }
219                        if ((timeout - localTimeout) <= 0) {
220                                throw new TimeoutException("Timeout exception time: " + timeout);
221                        }
222                        resp = serializer.deserializeResponse(results.get(corrId), type);
223
224                        // Remove and indicate the key exists (a hashmap can contain a null
225                        // object, using this we'll know whether a response has been
226                        // received before)
227                        results.put(corrId, null);
228                }
229
230                if (resp.getError() != null) {
231                        OmqException error = resp.getError();
232                        String name = error.getType();
233                        String message = error.getMessage();
234                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
235                }
236
237                return resp.getResult();
238        }
239
240        private Object getResults(String corrId, int wait, long timeout, Class<?> type) throws Exception {
241                Response resp = null;
242                Class<?> actualType = type.getComponentType();
243
244                Object array = Array.newInstance(actualType, wait);
245
246                int i = 0;
247                long localTimeout = timeout;
248                long start = System.currentTimeMillis();
249
250                while (i < wait) {
251                        synchronized (results) {
252                                // Due to we are using notifyAll(), we need to control the real
253                                // time
254                                while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
255                                        results.wait(localTimeout);
256                                        localTimeout = System.currentTimeMillis() - start;
257                                }
258                                if ((timeout - localTimeout) <= 0) {
259                                        throw new TimeoutException("Timeout exception time: " + timeout);
260                                }
261                                // Remove the corrId to receive new replies
262                                resp = serializer.deserializeResponse(results.remove(corrId), actualType);
263                                Array.set(array, i, resp.getResult());
264                        }
265                        i++;
266                }
267                synchronized (results) {
268                        results.put(corrId, null);
269                }
270
271                return array;
272        }
273
274        /**
275         * Gets the Map used internally to retreive the response of the server
276         *
277         * @return a map with all the keys processed. Every key is a correlation id
278         *         of a method invoked remotely
279         */
280        public Map<String, byte[]> getResults() {
281                return results;
282        }
283
284        @Override
285        public String getRef() {
286                return uid;
287        }
288
289}
Note: See TracBrowser for help on using the repository browser.