source: branches/objectmq_old/src/omq/client/proxy/Proxymq.java @ 41

Last change on this file since 41 was 39, checked in by stoda, 11 years ago

Exception test revised.
Broker.lookup does not need the casting
GsonImp? arguments problem solved
MultiProcessTest? added

File size: 10.6 KB
Line 
1package omq.client.proxy;
2
3import java.io.IOException;
4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
6import java.lang.reflect.Proxy;
7import java.util.Collection;
8import java.util.HashMap;
9import java.util.Hashtable;
10import java.util.Map;
11import java.util.Properties;
12
13import omq.Remote;
14import omq.client.annotation.AsyncMethod;
15import omq.client.annotation.SyncMethod;
16import omq.client.listener.ResponseListener;
17import omq.common.broker.Broker;
18import omq.common.event.Event;
19import omq.common.event.EventDispatcher;
20import omq.common.event.EventListener;
21import omq.common.message.Request;
22import omq.common.message.Response;
23import omq.common.util.ParameterQueue;
24import omq.common.util.Serializer;
25import omq.exception.NoContainsInstanceException;
26import omq.exception.OmqException;
27import omq.exception.RetryException;
28import omq.exception.SerializerException;
29import omq.exception.TimeoutException;
30
31import com.rabbitmq.client.AMQP.BasicProperties;
32
33/**
34 * EvoProxy class. This class inherits from InvocationHandler and gives you a
35 * proxy with a server using an environment
36 *
37 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
38 *
39 */
40public class Proxymq implements InvocationHandler, Remote {
41
42        /**
43         *
44         */
45        private static final long serialVersionUID = 1L;
46        private static Map<String, Object> proxies = new Hashtable<String, Object>();
47
48        private String uid;
49        private transient ResponseListener rListener;
50        private transient EventDispatcher dispatcher;
51        // private transient Channel channel;
52        private transient Properties env;
53        private transient Map<String, byte[]> results;
54        private transient Map<String, EventListener<?>> listeners;
55
56        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
57        static {
58                primitiveClasses.put("byte", Byte.class);
59                primitiveClasses.put("short", Short.class);
60                primitiveClasses.put("char", Character.class);
61                primitiveClasses.put("int", Integer.class);
62                primitiveClasses.put("long", Long.class);
63                primitiveClasses.put("float", Float.class);
64                primitiveClasses.put("double", Double.class);
65        }
66
67        /**
68         * EvoProxy Constructor.
69         *
70         * This constructor uses an uid to know which object will call. It also uses
71         * Properties to set where to send the messages
72         *
73         * @param uid
74         *            The uid represents the unique identifier of a remote object
75         * @param clazz
76         *            It represents the real class of the remote object. With this
77         *            class the system can know the remoteInterface used and it can
78         *            also see which annotations are used
79         * @param env
80         *            The environment is used to know where to send the messages
81         * @throws Exception
82         */
83        public Proxymq(String uid, Class<?> clazz, Properties env) throws Exception {
84                this.uid = uid;
85                this.rListener = ResponseListener.getRequestListener();
86                this.dispatcher = EventDispatcher.getDispatcher();
87
88                // TODO what is better to use a new channel or to use the same?
89                // this.channel = Broker.getChannel();
90                this.env = env;
91
92                listeners = new HashMap<String, EventListener<?>>();
93
94                // Create a new hashmap and registry it in rListener
95                results = new HashMap<String, byte[]>();
96                rListener.registerProxy(this);
97        }
98
99        @Override
100        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
101                // long timeStart = (new Date()).getTime();
102
103                // Local methods only
104                String methodName = method.getName();
105
106                // The local methods will be invoked here
107                if (method.getDeclaringClass().equals(Remote.class)) {
108                        if (methodName.equals("getRef")) {
109                                return getRef();
110                        } else if (methodName.equals("addListener")) {
111                                addListener((EventListener<?>) arguments[0]);
112                                return null;
113                        } else if (methodName.equals("removeListener")) {
114                                removeListener((EventListener<?>) arguments[0]);
115                                return null;
116                        } else if (methodName.equals("getListeners")) {
117                                return getListeners();
118                        }
119                }
120
121                // Create the request
122                Request request = createRequest(method, arguments);
123
124                // Log.saveTimeSendRequestLog("Client-time-request", request.getId(),
125                // method.getName(), timeStart);
126
127                Object response = null;
128                // Publish the request
129                if (request.isAsync()) {
130                        System.out.println("Publish async request -> " + request.getId());
131                        publishAsyncRequest(request);
132                } else {
133                        System.out.println("Publish sync request -> " + request.getId());
134                        response = publishSyncRequest(request, method.getReturnType());
135
136                        // long timeEnd = (new Date()).getTime();
137                        // Log.saveTimeSendRequestLog("Client-time-response",
138                        // request.getId(), method.getName(), timeEnd);
139                }
140
141                return response;
142        }
143
144        private void publishMessage(Request request, String replyQueueName) throws Exception {
145                String corrId = request.getId();
146
147                // Get the environment properties
148                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
149                String routingkey = this.uid;
150
151                // Add the correlation ID and create a replyTo property
152                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).build();
153
154                // Publish the message
155                byte[] bytesRequest = Serializer.serialize(request);
156                // TODO See this
157                // channel.basicPublish(exchange, routingkey, props, bytesRequest);
158                Broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
159                // Log.saveLog("Client-Serialize", bytesRequest);
160        }
161
162        private void publishAsyncRequest(Request request) throws Exception {
163                // Get the environment properties
164                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
165                publishMessage(request, replyQueueName);
166        }
167
168        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
169                String corrId = request.getId();
170
171                int retries = request.getRetries();
172                long timeout = request.getTimeout();
173
174                // Get the environment properties
175                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
176
177                // Publish the message
178                int i = 0;
179                while (i < retries) {
180                        try {
181                                publishMessage(request, replyQueueName);
182                                return getResult(corrId, timeout, type);
183                        } catch (TimeoutException te) {
184                                System.out.println("Timeout exception catched " + te);
185                                te.printStackTrace();
186                        }
187                        i++;
188                }
189                throw new RetryException(retries, timeout);
190        }
191
192        private Request createRequest(Method method, Object[] arguments) {
193                String corrId = java.util.UUID.randomUUID().toString();
194                String methodName = method.getName();
195
196                // Since we need to know whether the method is async and if it has to
197                // return using an annotation, we'll only check the AsyncMethod
198                // annotation
199                if (method.getAnnotation(AsyncMethod.class) == null) {
200                        int retries = 1;
201                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
202                        if (method.getAnnotation(SyncMethod.class) != null) {
203                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
204                                retries = sync.retry();
205                                timeout = sync.timeout();
206                        }
207                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout);
208                } else {
209                        return Request.newAsyncRequest(corrId, methodName, arguments);
210                }
211        }
212
213        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
214                Response resp = null;
215
216                // Wait for the results.
217                long localTimeout = 0;
218                long start = System.currentTimeMillis();
219                synchronized (results) {
220                        // Due to we are using notifyAll(), we need to control the real time
221                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
222                                results.wait(timeout);
223                                localTimeout = System.currentTimeMillis() - start;
224                        }
225                        if ((timeout - localTimeout) <= 0) {
226                                throw new TimeoutException("Timeout exception time: " + timeout);
227                        }
228                        resp = Serializer.deserializeResponse(results.get(corrId), type);
229                        // Log.saveLog("Client-Deserialize", results.get(corrId));
230
231                        // Remove and indicate the key exists (a hashmap can contain a null
232                        // object, using this we'll know whether a response has been
233                        // received before)
234                        results.put(corrId, null);
235                }
236
237                if (resp.getError() != null) {
238                        OmqException error = resp.getError();
239                        String name = error.getType();
240                        String message = error.getMessage();
241                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
242                }
243
244                return resp.getResult();
245        }
246
247        /**
248         *
249         * @param reference
250         *            RemoteObject reference
251         * @return true if the proxy has been created before or false in the other
252         *         case
253         */
254        public static boolean containsProxy(String reference) {
255                return proxies.containsKey(reference);
256        }
257
258        /**
259         *
260         * @param reference
261         *            RemoteObject reference
262         * @return a proxy instance
263         * @throws NoContainsInstanceException
264         */
265        public static Object getInstance(String reference) throws NoContainsInstanceException {
266                if (!containsProxy(reference)) {
267                        throw new NoContainsInstanceException(reference);
268                }
269                return proxies.get(reference);
270        }
271
272        /**
273         * Returns an instance of a proxy class for the specified interfaces that
274         * dispatches method invocations to the specified invocation handler. * @param
275         * loader
276         *
277         * @param loader
278         *            the class loader to define the proxy class
279         *
280         * @param interfaces
281         *            the list of interfaces for the proxy class to implement
282         * @param proxy
283         *            the invocation handler to dispatch method invocations to
284         * @return a proxy instance with the specified invocation handler of a proxy
285         *         class that is defined by the specified class loader and that
286         *         implements the specified interfaces
287         */
288        public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) {
289                if (proxies.containsKey(proxy.getRef())) {
290                        System.out.println("Proxy trobat");
291                        return proxies.get(proxy.getRef());
292                }
293                Object value = Proxy.newProxyInstance(loader, interfaces, proxy);
294                proxies.put(proxy.getRef(), value);
295                return value;
296        }
297
298        /**
299         * Gets the Map used internally to retreive the response of the server
300         *
301         * @return a map with all the keys processed. Every key is a correlation id
302         *         of a method invoked remotely
303         */
304        public Map<String, byte[]> getResults() {
305                return results;
306        }
307
308        @Override
309        public String getRef() {
310                return uid;
311        }
312
313        @Override
314        public void notifyEvent(Event event) throws IOException, SerializerException {
315        }
316
317        @Override
318        public void addListener(EventListener<?> eventListener) throws Exception {
319                if (eventListener.getTopic() == null) {
320                        eventListener.setTopic(uid);
321                }
322                listeners.put(eventListener.getTopic(), eventListener);
323                dispatcher.addListener(eventListener);
324        }
325
326        @Override
327        public void removeListener(EventListener<?> eventListener) throws Exception {
328                listeners.remove(eventListener.getTopic());
329                dispatcher.removeListener(eventListener);
330        }
331
332        @Override
333        public Collection<EventListener<?>> getListeners() throws Exception {
334                return listeners.values();
335        }
336
337}
Note: See TracBrowser for help on using the repository browser.