source: trunk/src/main/java/omq/client/proxy/Proxymq.java @ 44

Last change on this file since 44 was 44, checked in by stoda, 11 years ago

Objectmq converted to maven project

File size: 10.6 KB
Line 
1package omq.client.proxy;
2
3import java.io.IOException;
4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
6import java.lang.reflect.Proxy;
7import java.util.Collection;
8import java.util.HashMap;
9import java.util.Hashtable;
10import java.util.Map;
11import java.util.Properties;
12
13import omq.Remote;
14import omq.client.annotation.AsyncMethod;
15import omq.client.annotation.SyncMethod;
16import omq.client.listener.ResponseListener;
17import omq.common.broker.Broker;
18import omq.common.event.Event;
19import omq.common.event.EventDispatcher;
20import omq.common.event.EventListener;
21import omq.common.message.Request;
22import omq.common.message.Response;
23import omq.common.util.ParameterQueue;
24import omq.common.util.Serializer;
25import omq.exception.NoContainsInstanceException;
26import omq.exception.OmqException;
27import omq.exception.RetryException;
28import omq.exception.SerializerException;
29import omq.exception.TimeoutException;
30
31import com.rabbitmq.client.AMQP.BasicProperties;
32
33/**
34 * EvoProxy class. This class inherits from InvocationHandler and gives you a
35 * proxy with a server using an environment
36 *
37 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
38 *
39 */
40public class Proxymq implements InvocationHandler, Remote {
41
42        /**
43         *
44         */
45        private static final long serialVersionUID = 1L;
46        private static Map<String, Object> proxies = new Hashtable<String, Object>();
47
48        private String uid;
49        private transient ResponseListener rListener;
50        private transient EventDispatcher dispatcher;
51        // private transient Channel channel;
52        private transient Properties env;
53        private transient Map<String, byte[]> results;
54        private transient Map<String, EventListener<?>> listeners;
55
56        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
57        static {
58                primitiveClasses.put("byte", Byte.class);
59                primitiveClasses.put("short", Short.class);
60                primitiveClasses.put("char", Character.class);
61                primitiveClasses.put("int", Integer.class);
62                primitiveClasses.put("long", Long.class);
63                primitiveClasses.put("float", Float.class);
64                primitiveClasses.put("double", Double.class);
65        }
66
67        /**
68         * EvoProxy Constructor.
69         *
70         * This constructor uses an uid to know which object will call. It also uses
71         * Properties to set where to send the messages
72         *
73         * @param uid
74         *            The uid represents the unique identifier of a remote object
75         * @param clazz
76         *            It represents the real class of the remote object. With this
77         *            class the system can know the remoteInterface used and it can
78         *            also see which annotations are used
79         * @param env
80         *            The environment is used to know where to send the messages
81         * @throws Exception
82         */
83        public Proxymq(String uid, Class<?> clazz, Properties env) throws Exception {
84                this.uid = uid;
85                this.rListener = ResponseListener.getRequestListener();
86                this.dispatcher = EventDispatcher.getDispatcher();
87
88                // TODO what is better to use a new channel or to use the same?
89                // this.channel = Broker.getChannel();
90                this.env = env;
91
92                listeners = new HashMap<String, EventListener<?>>();
93
94                // Create a new hashmap and registry it in rListener
95                results = new HashMap<String, byte[]>();
96                rListener.registerProxy(this);
97        }
98
99        @Override
100        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
101                // long timeStart = (new Date()).getTime();
102
103                // Local methods only
104                String methodName = method.getName();
105
106                // The local methods will be invoked here
107                if (method.getDeclaringClass().equals(Remote.class)) {
108                        if (methodName.equals("getRef")) {
109                                return getRef();
110                        } else if (methodName.equals("addListener")) {
111                                addListener((EventListener<?>) arguments[0]);
112                                return null;
113                        } else if (methodName.equals("removeListener")) {
114                                removeListener((EventListener<?>) arguments[0]);
115                                return null;
116                        } else if (methodName.equals("getListeners")) {
117                                return getListeners();
118                        }
119                }
120
121                // Create the request
122                Request request = createRequest(method, arguments);
123
124                // Log.saveTimeSendRequestLog("Client-time-request", request.getId(),
125                // method.getName(), timeStart);
126
127                Object response = null;
128                // Publish the request
129                if (request.isAsync()) {
130                        System.out.println("Publish async request -> " + request.getId());
131                        publishAsyncRequest(request);
132                } else {
133                        System.out.println("Publish sync request -> " + request.getId());
134                        response = publishSyncRequest(request, method.getReturnType());
135
136                        // long timeEnd = (new Date()).getTime();
137                        // Log.saveTimeSendRequestLog("Client-time-response",
138                        // request.getId(), method.getName(), timeEnd);
139                }
140
141                return response;
142        }
143
144        private void publishMessage(Request request, String replyQueueName) throws Exception {
145                String corrId = request.getId();
146
147                // Get the environment properties
148                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
149                String routingkey = this.uid;
150
151                // Add the correlation ID and create a replyTo property
152                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).build();
153
154                // Publish the message
155                byte[] bytesRequest = Serializer.serialize(request);
156                // TODO See this
157                // channel.basicPublish(exchange, routingkey, props, bytesRequest);
158                Broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
159                // Log.saveLog("Client-Serialize", bytesRequest);
160        }
161
162        private void publishAsyncRequest(Request request) throws Exception {
163                // Get the environment properties
164                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
165                publishMessage(request, replyQueueName);
166        }
167
168        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
169                String corrId = request.getId();
170
171                int retries = request.getRetries();
172                long timeout = request.getTimeout();
173
174                // Get the environment properties
175                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
176
177                // Publish the message
178                int i = 0;
179                while (i < retries) {
180                        try {
181                                publishMessage(request, replyQueueName);
182                                return getResult(corrId, timeout, type);
183                        } catch (TimeoutException te) {
184                                System.out.println("Timeout exception catched " + te);
185                                te.printStackTrace();
186                        }
187                        i++;
188                }
189                throw new RetryException(retries, timeout);
190        }
191
192        private Request createRequest(Method method, Object[] arguments) {
193                String corrId = java.util.UUID.randomUUID().toString();
194                String methodName = method.getName();
195
196                // Since we need to know whether the method is async and if it has to
197                // return using an annotation, we'll only check the AsyncMethod
198                // annotation
199                if (method.getAnnotation(AsyncMethod.class) == null) {
200                        int retries = 1;
201                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
202                        if (method.getAnnotation(SyncMethod.class) != null) {
203                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
204                                retries = sync.retry();
205                                timeout = sync.timeout();
206                        }
207                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout);
208                } else {
209                        return Request.newAsyncRequest(corrId, methodName, arguments);
210                }
211        }
212
213        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
214                Response resp = null;
215
216                // Wait for the results.
217                long localTimeout = 0;
218                long start = System.currentTimeMillis();
219                synchronized (results) {
220                        // Due to we are using notifyAll(), we need to control the real time
221                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
222                                results.wait(timeout);
223                                localTimeout = System.currentTimeMillis() - start;
224                        }
225                        if ((timeout - localTimeout) <= 0) {
226                                throw new TimeoutException("Timeout exception time: " + timeout);
227                        }
228                        resp = Serializer.deserializeResponse(results.get(corrId), type);
229                        // Log.saveLog("Client-Deserialize", results.get(corrId));
230
231                        // Remove and indicate the key exists (a hashmap can contain a null
232                        // object, using this we'll know whether a response has been
233                        // received before)
234                        results.put(corrId, null);
235                }
236
237                if (resp.getError() != null) {
238                        OmqException error = resp.getError();
239                        String name = error.getType();
240                        String message = error.getMessage();
241                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
242                }
243
244                return resp.getResult();
245        }
246
247        /**
248         *
249         * @param reference
250         *            RemoteObject reference
251         * @return true if the proxy has been created before or false in the other
252         *         case
253         */
254        public static boolean containsProxy(String reference) {
255                return proxies.containsKey(reference);
256        }
257
258        /**
259         *
260         * @param reference
261         *            RemoteObject reference
262         * @return a proxy instance
263         * @throws NoContainsInstanceException
264         */
265        public static Object getInstance(String reference) throws NoContainsInstanceException {
266                if (!containsProxy(reference)) {
267                        throw new NoContainsInstanceException(reference);
268                }
269                return proxies.get(reference);
270        }
271
272        /**
273         * Returns an instance of a proxy class for the specified interfaces that
274         * dispatches method invocations to the specified invocation handler. * @param
275         * loader
276         *
277         * @param loader
278         *            the class loader to define the proxy class
279         *
280         * @param interfaces
281         *            the list of interfaces for the proxy class to implement
282         * @param proxy
283         *            the invocation handler to dispatch method invocations to
284         * @return a proxy instance with the specified invocation handler of a proxy
285         *         class that is defined by the specified class loader and that
286         *         implements the specified interfaces
287         */
288        public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) {
289                if (proxies.containsKey(proxy.getRef())) {
290                        System.out.println("Proxy trobat");
291                        return proxies.get(proxy.getRef());
292                }
293                Object value = Proxy.newProxyInstance(loader, interfaces, proxy);
294                proxies.put(proxy.getRef(), value);
295                return value;
296        }
297
298        /**
299         * Gets the Map used internally to retreive the response of the server
300         *
301         * @return a map with all the keys processed. Every key is a correlation id
302         *         of a method invoked remotely
303         */
304        public Map<String, byte[]> getResults() {
305                return results;
306        }
307
308        @Override
309        public String getRef() {
310                return uid;
311        }
312
313        @Override
314        public void notifyEvent(Event event) throws IOException, SerializerException {
315        }
316
317        @Override
318        public void addListener(EventListener<?> eventListener) throws Exception {
319                if (eventListener.getTopic() == null) {
320                        eventListener.setTopic(uid);
321                }
322                listeners.put(eventListener.getTopic(), eventListener);
323                dispatcher.addListener(eventListener);
324        }
325
326        @Override
327        public void removeListener(EventListener<?> eventListener) throws Exception {
328                listeners.remove(eventListener.getTopic());
329                dispatcher.removeListener(eventListener);
330        }
331
332        @Override
333        public Collection<EventListener<?>> getListeners() throws Exception {
334                return listeners.values();
335        }
336
337}
Note: See TracBrowser for help on using the repository browser.