source: branches/objectmq-1.0/src/omq/client/proxy/Proxymq.java @ 33

Last change on this file since 33 was 33, checked in by amoreno, 11 years ago

new release version

File size: 10.2 KB
Line 
1package omq.client.proxy;
2
3import java.io.IOException;
4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
6import java.lang.reflect.Proxy;
7import java.util.Collection;
8import java.util.Date;
9import java.util.HashMap;
10import java.util.Hashtable;
11import java.util.Map;
12import java.util.Properties;
13
14import omq.Remote;
15import omq.client.annotation.AsyncMethod;
16import omq.client.annotation.SyncMethod;
17import omq.client.remote.response.ResponseListener;
18import omq.common.event.Event;
19import omq.common.event.EventDispatcher;
20import omq.common.event.EventListener;
21import omq.common.message.Request;
22import omq.common.message.Response;
23import omq.common.util.ParameterQueue;
24import omq.common.util.Serializer;
25import omq.exception.NoContainsInstanceException;
26import omq.exception.RetryException;
27import omq.exception.SerializerException;
28import omq.exception.TimeoutException;
29
30import com.rabbitmq.client.AMQP.BasicProperties;
31import com.rabbitmq.client.Channel;
32
33/**
34 * EvoProxy class. This class inherits from InvocationHandler and gives you a
35 * proxy with a server using an environment
36 *
37 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
38 *
39 */
40public class Proxymq implements InvocationHandler, Remote {
41
42        /**
43         *
44         */
45        private static final long serialVersionUID = 1L;
46        private static Map<String, Object> proxies = new Hashtable<String, Object>();
47
48        private String uid;
49        private transient ResponseListener rListener;
50        private transient EventDispatcher dispatcher;
51        private transient Channel channel;
52        private transient Properties env;
53        private transient Map<String, byte[]> results;
54        private transient Map<String, EventListener> listeners;
55
56        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
57        static {
58                primitiveClasses.put("byte", Byte.class);
59                primitiveClasses.put("short", Short.class);
60                primitiveClasses.put("char", Character.class);
61                primitiveClasses.put("int", Integer.class);
62                primitiveClasses.put("long", Long.class);
63                primitiveClasses.put("float", Float.class);
64                primitiveClasses.put("double", Double.class);
65        }
66
67        /**
68         * EvoProxy Constructor.
69         *
70         * This constructor uses an uid to know which object will call. It also uses
71         * Properties to set where to send the messages
72         *
73         * @param uid
74         *            The uid represents the unique identifier of a remote object
75         * @param clazz
76         *            It represents the real class of the remote object. With this
77         *            class the system can know the remoteInterface used and it can
78         *            also see which annotations are used
79         * @param env
80         *            The environment is used to know where to send the messages
81         * @throws Exception
82         */
83        public Proxymq(String uid, Class<?> clazz, Properties env) throws Exception {
84                this.uid = uid;
85                this.rListener = ResponseListener.getRequestListener();
86                this.dispatcher = EventDispatcher.getDispatcher();
87
88                this.channel = rListener.getChannel();
89                this.env = env;
90
91                listeners = new HashMap<String, EventListener>();
92
93                // Create a new hashmap and registry it in rListener
94                results = new HashMap<String, byte[]>();
95                rListener.registerProxy(this);
96        }
97
98        @Override
99        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
100                //long timeStart = (new Date()).getTime();
101               
102                // Local methods only
103                String methodName = method.getName();
104
105                // The local methods will be invoked here
106                if (method.getDeclaringClass().equals(Remote.class)) {
107                        if (methodName.equals("getRef")) {
108                                return getRef();
109                        } else if (methodName.equals("addListener")) {
110                                addListener((EventListener) arguments[0]);
111                                return null;
112                        } else if (methodName.equals("removeListener")) {
113                                removeListener((EventListener) arguments[0]);
114                                return null;
115                        } else if (methodName.equals("getListeners")) {
116                                return getListeners();
117                        }
118                }
119
120                // Create the request
121                Request request = createRequest(method, arguments);
122
123                //Log.saveTimeSendRequestLog("Client-time-request", request.getId(), method.getName(), timeStart);
124               
125                Object response = null;
126                // Publish the request
127                if (request.isAsync()) {
128                        System.out.println("Publish async request -> " + request.getId());
129                        publishAsyncRequest(request);
130                } else {
131                        System.out.println("Publish sync request -> " + request.getId());                       
132                        response = publishSyncRequest(request, method.getReturnType());
133                       
134                        //long timeEnd = (new Date()).getTime();
135                        //Log.saveTimeSendRequestLog("Client-time-response", request.getId(), method.getName(), timeEnd);
136                }
137               
138                return response;
139        }
140
141        private void publishMessage(Request request, String replyQueueName) throws IOException, SerializerException {
142                String corrId = request.getId();
143
144                // Get the environment properties
145                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
146                String routingkey = this.uid;
147
148                // Add the correlation ID and create a replyTo property
149                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).build();
150
151                // Publish the message
152                byte[] bytesRequest = Serializer.serialize(request);
153                channel.basicPublish(exchange, routingkey, props, bytesRequest);
154                //Log.saveLog("Client-Serialize", bytesRequest);
155        }
156
157        private void publishAsyncRequest(Request request) throws IOException, SerializerException {
158                // Get the environment properties
159                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
160                publishMessage(request, replyQueueName);
161        }
162
163        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
164                String corrId = request.getId();
165
166                int retries = request.getRetries();
167                long timeout = request.getTimeout();
168
169                // Get the environment properties
170                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
171
172                // Publish the message
173                int i = 0;
174                while (i < retries) {
175                        try {
176                                publishMessage(request, replyQueueName);
177                                return getResult(corrId, timeout, type);
178                        } catch (TimeoutException te) {
179                                System.out.println("Timeout exception catched " + te);
180                                te.printStackTrace();
181                        }
182                        i++;
183                }
184                throw new RetryException(retries, timeout);
185        }
186
187        private Request createRequest(Method method, Object[] arguments) {
188                String corrId = java.util.UUID.randomUUID().toString();
189                String methodName = method.getName();
190
191                // Since we need to know whether the method is async and if it has to
192                // return using an annotation, we'll only check the AsyncMethod
193                // annotation
194                if (method.getAnnotation(AsyncMethod.class) == null) {
195                        int retries = 1;
196                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
197                        if (method.getAnnotation(SyncMethod.class) != null) {
198                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
199                                retries = sync.retry();
200                                timeout = sync.timeout();
201                        }
202                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout);
203                } else {
204                        return Request.newAsyncRequest(corrId, methodName, arguments);
205                }
206        }
207
208        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
209                Response resp = null;
210
211                // Wait for the results.
212                long localTimeout = 0;
213                long start = System.currentTimeMillis();
214                synchronized (results) {
215                        // Due to we are using notifyAll(), we need to control the real time
216                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
217                                results.wait(timeout);
218                                localTimeout = System.currentTimeMillis() - start;
219                        }
220                        if ((timeout - localTimeout) <= 0) {
221                                throw new TimeoutException("Timeout exception time: " + timeout);
222                        }
223                        resp = Serializer.deserializeResponse(results.get(corrId), type);
224                        //Log.saveLog("Client-Deserialize", results.get(corrId));
225                       
226                        // Remove and indicate the key exists (a hashmap can contain a null
227                        // object, using this we'll know whether a response has been
228                        // received before)
229                        results.put(corrId, null);
230                }
231
232                return resp.getResult();
233        }
234
235        /**
236         *
237         * @param reference
238         *            RemoteObject reference
239         * @return true if the proxy has been created before or false in the other
240         *         case
241         */
242        public static boolean containsProxy(String reference) {
243                return proxies.containsKey(reference);
244        }
245
246        /**
247         *
248         * @param reference
249         *            RemoteObject reference
250         * @return a proxy instance
251         * @throws NoContainsInstanceException
252         */
253        public static Object getInstance(String reference) throws NoContainsInstanceException {
254                if (!containsProxy(reference)) {
255                        throw new NoContainsInstanceException(reference);
256                }
257                return proxies.get(reference);
258        }
259
260        /**
261         * Returns an instance of a proxy class for the specified interfaces that
262         * dispatches method invocations to the specified invocation handler. * @param
263         * loader
264         *
265         * @param loader
266         *            the class loader to define the proxy class
267         *
268         * @param interfaces
269         *            the list of interfaces for the proxy class to implement
270         * @param proxy
271         *            the invocation handler to dispatch method invocations to
272         * @return a proxy instance with the specified invocation handler of a proxy
273         *         class that is defined by the specified class loader and that
274         *         implements the specified interfaces
275         */
276        public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) {
277                if (proxies.containsKey(proxy.getRef())) {
278                        System.out.println("Proxy trobat");
279                        return proxies.get(proxy.getRef());
280                }
281                Object value = Proxy.newProxyInstance(loader, interfaces, proxy);
282                proxies.put(proxy.getRef(), value);
283                return value;
284        }
285
286        /**
287         * Gets the Map used internally to retreive the response of the server
288         *
289         * @return a map with all the keys processed. Every key is a correlation id
290         *         of a method invoked remotely
291         */
292        public Map<String, byte[]> getResults() {
293                return results;
294        }
295
296        @Override
297        public String getRef() {
298                return uid;
299        }
300
301        @Override
302        public void notifyEvent(Event event) throws IOException, SerializerException {
303        }
304
305        @Override
306        public void addListener(EventListener eventListener) throws Exception {
307                if (eventListener.getTopic() == null) {
308                        eventListener.setTopic(uid);
309                }
310                listeners.put(eventListener.getTopic(), eventListener);
311                dispatcher.addListener(eventListener);
312        }
313
314        @Override
315        public void removeListener(EventListener eventListener) throws Exception {
316                listeners.remove(eventListener.getTopic());
317                dispatcher.removeListener(eventListener);
318        }
319
320        @Override
321        public Collection<EventListener> getListeners() throws Exception {
322                return listeners.values();
323        }
324
325}
Note: See TracBrowser for help on using the repository browser.