source: trunk/src/main/java/omq/client/proxy/Proxymq.java @ 78

Last change on this file since 78 was 77, checked in by stoda, 11 years ago

ParameterQueues? changed, added some properties to modify the queues

File size: 9.1 KB
Line 
1package omq.client.proxy;
2
3import java.lang.reflect.Array;
4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
6import java.util.HashMap;
7import java.util.Map;
8import java.util.Properties;
9
10import omq.Remote;
11import omq.client.annotation.AsyncMethod;
12import omq.client.annotation.MultiMethod;
13import omq.client.annotation.SyncMethod;
14import omq.client.listener.ResponseListener;
15import omq.common.broker.Broker;
16import omq.common.message.Request;
17import omq.common.message.Response;
18import omq.common.util.ParameterQueue;
19import omq.common.util.Serializer;
20import omq.exception.OmqException;
21import omq.exception.RetryException;
22import omq.exception.TimeoutException;
23
24import org.apache.log4j.Logger;
25
26import com.rabbitmq.client.AMQP.BasicProperties;
27
28/**
29 * EvoProxy class. This class inherits from InvocationHandler and gives you a
30 * proxy with a server using an environment
31 *
32 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
33 *
34 */
35public class Proxymq implements InvocationHandler, Remote {
36
37        /**
38         *
39         */
40        private static final long serialVersionUID = 1L;
41        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
42        private static final String multi = "multi#";
43
44        private String uid;
45        private transient String exchange;
46        private transient String multiExchange;
47        private transient String replyQueueName;
48        private transient String serializerType;
49        private transient Broker broker;
50        private transient ResponseListener rListener;
51        private transient Serializer serializer;
52        private transient Properties env;
53        private transient Integer deliveryMode = null;
54        private transient Map<String, byte[]> results;
55
56        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
57        static {
58                primitiveClasses.put("byte", Byte.class);
59                primitiveClasses.put("short", Short.class);
60                primitiveClasses.put("char", Character.class);
61                primitiveClasses.put("int", Integer.class);
62                primitiveClasses.put("long", Long.class);
63                primitiveClasses.put("float", Float.class);
64                primitiveClasses.put("double", Double.class);
65        }
66
67        /**
68         * EvoProxy Constructor.
69         *
70         * This constructor uses an uid to know which object will call. It also uses
71         * Properties to set where to send the messages
72         *
73         * @param uid
74         *            The uid represents the unique identifier of a remote object
75         * @param clazz
76         *            It represents the real class of the remote object. With this
77         *            class the system can know the remoteInterface used and it can
78         *            also see which annotations are used
79         * @param env
80         *            The environment is used to know where to send the messages
81         * @throws Exception
82         */
83        public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception {
84                this.uid = uid;
85                this.broker = broker;
86                rListener = broker.getResponseListener();
87                serializer = broker.getSerializer();
88
89                // TODO what is better to use a new channel or to use the same?
90                // this.channel = Broker.getChannel();
91                env = broker.getEnvironment();
92                exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
93                multiExchange = multi + uid;
94                replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
95
96                // set the serializer type
97                serializerType = env.getProperty(ParameterQueue.PROXY_SERIALIZER, Serializer.JAVA);
98                if (env.getProperty(ParameterQueue.DELIVERY_MODE) != null) {
99                        deliveryMode = Integer.parseInt(env.getProperty(ParameterQueue.DELIVERY_MODE));
100                }
101
102                // Create a new hashmap and registry it in rListener
103                results = new HashMap<String, byte[]>();
104                rListener.registerProxy(this);
105        }
106
107        @Override
108        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
109                // Local methods only
110                String methodName = method.getName();
111
112                // The local methods will be invoked here
113                if (method.getDeclaringClass().equals(Remote.class)) {
114                        if (methodName.equals("getRef")) {
115                                return getRef();
116                        }
117                }
118
119                // Create the request
120                Request request = createRequest(method, arguments);
121
122                Object response = null;
123                // Publish the request
124                if (request.isAsync()) {
125                        publishMessage(request, replyQueueName);
126                } else {
127                        response = publishSyncRequest(request, method.getReturnType());
128                }
129
130                return response;
131        }
132
133        private void publishMessage(Request request, String replyQueueName) throws Exception {
134                String corrId = request.getId();
135
136                // Get the environment properties
137                String exchange;
138                String routingkey;
139
140                if (request.isMulti()) {
141                        exchange = multiExchange;
142                        routingkey = "";
143                } else {
144                        exchange = this.exchange;
145                        routingkey = uid;
146                }
147
148                // Add the correlation ID and create a replyTo property
149                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType)
150                                .deliveryMode(deliveryMode).build();
151
152                // Publish the message
153                byte[] bytesRequest = serializer.serialize(serializerType, request);
154                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
155                logger.debug("Proxymq: " + uid + " invokes '" + request.getMethod() + "' , corrID: " + corrId + ", exchange: " + exchange + ", replyQueue: "
156                                + replyQueueName + ", serializerType: " + serializerType + ", multi call: " + request.isMulti() + ", async call: " + request.isAsync()
157                                + ", delivery mode: " + deliveryMode);
158        }
159
160        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
161                String corrId = request.getId();
162
163                int retries = request.getRetries();
164                long timeout = request.getTimeout();
165
166                // Publish the message
167                int i = 0;
168                while (i < retries) {
169                        try {
170                                publishMessage(request, replyQueueName);
171                                if (request.isMulti()) {
172                                        return getResults(corrId, 2, timeout, type);
173                                } else {
174                                        return getResult(corrId, timeout, type);
175                                }
176
177                        } catch (TimeoutException te) {
178                                logger.error(te);
179                        }
180                        i++;
181                }
182                throw new RetryException(retries, timeout);
183        }
184
185        private Request createRequest(Method method, Object[] arguments) {
186                String corrId = java.util.UUID.randomUUID().toString();
187                String methodName = method.getName();
188                boolean multi = false;
189                int wait = 0;
190
191                if (method.getAnnotation(MultiMethod.class) != null) {
192                        multi = true;
193                        wait = method.getAnnotation(MultiMethod.class).waitNum();
194                }
195
196                // Since we need to know whether the method is async and if it has to
197                // return using an annotation, we'll only check the AsyncMethod
198                // annotation
199                if (method.getAnnotation(AsyncMethod.class) == null) {
200                        int retries = 1;
201                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
202                        if (method.getAnnotation(SyncMethod.class) != null) {
203                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
204                                retries = sync.retry();
205                                timeout = sync.timeout();
206                        }
207                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi, wait);
208                } else {
209                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
210                }
211        }
212
213        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
214                Response resp = null;
215
216                // Wait for the results.
217                long localTimeout = timeout;
218                long start = System.currentTimeMillis();
219                synchronized (results) {
220                        // Due to we are using notifyAll(), we need to control the real time
221                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
222                                results.wait(localTimeout);
223                                localTimeout = System.currentTimeMillis() - start;
224                        }
225                        if ((timeout - localTimeout) <= 0) {
226                                throw new TimeoutException("Timeout exception time: " + timeout);
227                        }
228                        resp = serializer.deserializeResponse(results.get(corrId), type);
229
230                        // Remove and indicate the key exists (a hashmap can contain a null
231                        // object, using this we'll know whether a response has been
232                        // received before)
233                        results.put(corrId, null);
234                }
235
236                if (resp.getError() != null) {
237                        OmqException error = resp.getError();
238                        String name = error.getType();
239                        String message = error.getMessage();
240                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
241                }
242
243                return resp.getResult();
244        }
245
246        private Object getResults(String corrId, int wait, long timeout, Class<?> type) throws Exception {
247                Response resp = null;
248                Class<?> actualType = type.getComponentType();
249
250                Object array = Array.newInstance(actualType, wait);
251
252                int i = 0;
253                long localTimeout = timeout;
254                long start = System.currentTimeMillis();
255
256                while (i < wait) {
257                        synchronized (results) {
258                                // Due to we are using notifyAll(), we need to control the real
259                                // time
260                                while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
261                                        results.wait(localTimeout);
262                                        localTimeout = System.currentTimeMillis() - start;
263                                }
264                                if ((timeout - localTimeout) <= 0) {
265                                        throw new TimeoutException("Timeout exception time: " + timeout);
266                                }
267                                // Remove the corrId to receive new replies
268                                resp = serializer.deserializeResponse(results.remove(corrId), actualType);
269                                Array.set(array, i, resp.getResult());
270                        }
271                        i++;
272                }
273                synchronized (results) {
274                        results.put(corrId, null);
275                }
276
277                return array;
278        }
279
280        /**
281         * Gets the Map used internally to retreive the response of the server
282         *
283         * @return a map with all the keys processed. Every key is a correlation id
284         *         of a method invoked remotely
285         */
286        public Map<String, byte[]> getResults() {
287                return results;
288        }
289
290        @Override
291        public String getRef() {
292                return uid;
293        }
294
295}
Note: See TracBrowser for help on using the repository browser.