source: trunk/src/main/java/omq/client/proxy/Proxymq.java @ 47

Last change on this file since 47 was 47, checked in by stoda, 11 years ago

Refactoring Environment class - deleted.
StopBroker? problems solved (?)
Server can receive send and receive messages in different formats.
Some tests modified

TODO: finish all the tests, add log4j

File size: 11.0 KB
Line 
1package omq.client.proxy;
2
3import java.io.IOException;
4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
6import java.lang.reflect.Proxy;
7import java.util.Collection;
8import java.util.HashMap;
9import java.util.Hashtable;
10import java.util.Map;
11import java.util.Properties;
12
13import omq.Remote;
14import omq.client.annotation.AsyncMethod;
15import omq.client.annotation.SyncMethod;
16import omq.client.listener.ResponseListener;
17import omq.common.broker.Broker;
18import omq.common.event.Event;
19import omq.common.event.EventDispatcher;
20import omq.common.event.EventListener;
21import omq.common.message.Request;
22import omq.common.message.Response;
23import omq.common.util.ParameterQueue;
24import omq.common.util.Serializer;
25import omq.exception.NoContainsInstanceException;
26import omq.exception.OmqException;
27import omq.exception.RetryException;
28import omq.exception.SerializerException;
29import omq.exception.TimeoutException;
30
31import com.rabbitmq.client.AMQP.BasicProperties;
32
33/**
34 * EvoProxy class. This class inherits from InvocationHandler and gives you a
35 * proxy with a server using an environment
36 *
37 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
38 *
39 */
40public class Proxymq implements InvocationHandler, Remote {
41
42        /**
43         *
44         */
45        private static final long serialVersionUID = 1L;
46        private static Map<String, Object> proxies = new Hashtable<String, Object>();
47
48        private String uid;
49        private transient String serializerType;
50        private transient ResponseListener rListener;
51        private transient EventDispatcher dispatcher;
52        // private transient Channel channel;
53        private transient Properties env;
54        private transient Map<String, byte[]> results;
55        private transient Map<String, EventListener<?>> listeners;
56
57        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
58        static {
59                primitiveClasses.put("byte", Byte.class);
60                primitiveClasses.put("short", Short.class);
61                primitiveClasses.put("char", Character.class);
62                primitiveClasses.put("int", Integer.class);
63                primitiveClasses.put("long", Long.class);
64                primitiveClasses.put("float", Float.class);
65                primitiveClasses.put("double", Double.class);
66        }
67
68        /**
69         * EvoProxy Constructor.
70         *
71         * This constructor uses an uid to know which object will call. It also uses
72         * Properties to set where to send the messages
73         *
74         * @param uid
75         *            The uid represents the unique identifier of a remote object
76         * @param clazz
77         *            It represents the real class of the remote object. With this
78         *            class the system can know the remoteInterface used and it can
79         *            also see which annotations are used
80         * @param env
81         *            The environment is used to know where to send the messages
82         * @throws Exception
83         */
84        public Proxymq(String uid, Class<?> clazz, Properties env) throws Exception {
85                this.uid = uid;
86                this.rListener = ResponseListener.getRequestListener();
87                this.dispatcher = EventDispatcher.getDispatcher();
88
89                // TODO what is better to use a new channel or to use the same?
90                // this.channel = Broker.getChannel();
91                this.env = env;
92
93                // set the serializer type
94                serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java);
95
96                listeners = new HashMap<String, EventListener<?>>();
97
98                // Create a new hashmap and registry it in rListener
99                results = new HashMap<String, byte[]>();
100                rListener.registerProxy(this);
101        }
102
103        @Override
104        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
105                // long timeStart = (new Date()).getTime();
106
107                // Local methods only
108                String methodName = method.getName();
109
110                // The local methods will be invoked here
111                if (method.getDeclaringClass().equals(Remote.class)) {
112                        if (methodName.equals("getRef")) {
113                                return getRef();
114                        } else if (methodName.equals("addListener")) {
115                                addListener((EventListener<?>) arguments[0]);
116                                return null;
117                        } else if (methodName.equals("removeListener")) {
118                                removeListener((EventListener<?>) arguments[0]);
119                                return null;
120                        } else if (methodName.equals("getListeners")) {
121                                return getListeners();
122                        }
123                }
124
125                // Create the request
126                Request request = createRequest(method, arguments);
127
128                // Log.saveTimeSendRequestLog("Client-time-request", request.getId(),
129                // method.getName(), timeStart);
130
131                Object response = null;
132                // Publish the request
133                if (request.isAsync()) {
134                        System.out.println("Publish async request -> " + request.getId());
135                        publishAsyncRequest(request);
136                } else {
137                        System.out.println("Publish sync request -> " + request.getId());
138                        response = publishSyncRequest(request, method.getReturnType());
139
140                        // long timeEnd = (new Date()).getTime();
141                        // Log.saveTimeSendRequestLog("Client-time-response",
142                        // request.getId(), method.getName(), timeEnd);
143                }
144
145                return response;
146        }
147
148        private void publishMessage(Request request, String replyQueueName) throws Exception {
149                String corrId = request.getId();
150
151                // Get the environment properties
152                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
153                String routingkey = this.uid;
154
155                // Add the correlation ID and create a replyTo property
156                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType).build();
157
158                // Publish the message
159                byte[] bytesRequest = Serializer.serialize(serializerType, request);
160                // TODO See this
161                // channel.basicPublish(exchange, routingkey, props, bytesRequest);
162                Broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
163                // Log.saveLog("Client-Serialize", bytesRequest);
164        }
165
166        private void publishAsyncRequest(Request request) throws Exception {
167                // Get the environment properties
168                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
169                publishMessage(request, replyQueueName);
170        }
171
172        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
173                String corrId = request.getId();
174
175                int retries = request.getRetries();
176                long timeout = request.getTimeout();
177
178                // Get the environment properties
179                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
180
181                // Publish the message
182                int i = 0;
183                while (i < retries) {
184                        try {
185                                publishMessage(request, replyQueueName);
186                                return getResult(corrId, timeout, type);
187                        } catch (TimeoutException te) {
188                                System.out.println("Timeout exception catched " + te);
189                                te.printStackTrace();
190                        }
191                        i++;
192                }
193                throw new RetryException(retries, timeout);
194        }
195
196        private Request createRequest(Method method, Object[] arguments) {
197                String corrId = java.util.UUID.randomUUID().toString();
198                String methodName = method.getName();
199
200                // Since we need to know whether the method is async and if it has to
201                // return using an annotation, we'll only check the AsyncMethod
202                // annotation
203                if (method.getAnnotation(AsyncMethod.class) == null) {
204                        int retries = 1;
205                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
206                        if (method.getAnnotation(SyncMethod.class) != null) {
207                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
208                                retries = sync.retry();
209                                timeout = sync.timeout();
210                        }
211                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout);
212                } else {
213                        return Request.newAsyncRequest(corrId, methodName, arguments);
214                }
215        }
216
217        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
218                Response resp = null;
219
220                // Wait for the results.
221                long localTimeout = 0;
222                long start = System.currentTimeMillis();
223                synchronized (results) {
224                        // Due to we are using notifyAll(), we need to control the real time
225                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
226                                results.wait(timeout);
227                                localTimeout = System.currentTimeMillis() - start;
228                        }
229                        if ((timeout - localTimeout) <= 0) {
230                                throw new TimeoutException("Timeout exception time: " + timeout);
231                        }
232                        resp = Serializer.deserializeResponse(results.get(corrId), type);
233                        // Log.saveLog("Client-Deserialize", results.get(corrId));
234
235                        // Remove and indicate the key exists (a hashmap can contain a null
236                        // object, using this we'll know whether a response has been
237                        // received before)
238                        results.put(corrId, null);
239                }
240
241                if (resp.getError() != null) {
242                        OmqException error = resp.getError();
243                        String name = error.getType();
244                        String message = error.getMessage();
245                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
246                }
247
248                return resp.getResult();
249        }
250
251        /**
252         *
253         * @param reference
254         *            RemoteObject reference
255         * @return true if the proxy has been created before or false in the other
256         *         case
257         */
258        public static boolean containsProxy(String reference) {
259                return proxies.containsKey(reference);
260        }
261
262        /**
263         *
264         * @param reference
265         *            RemoteObject reference
266         * @return a proxy instance
267         * @throws NoContainsInstanceException
268         */
269        public static Object getInstance(String reference) throws NoContainsInstanceException {
270                if (!containsProxy(reference)) {
271                        throw new NoContainsInstanceException(reference);
272                }
273                return proxies.get(reference);
274        }
275
276        /**
277         * Returns an instance of a proxy class for the specified interfaces that
278         * dispatches method invocations to the specified invocation handler. * @param
279         * loader
280         *
281         * @param loader
282         *            the class loader to define the proxy class
283         *
284         * @param interfaces
285         *            the list of interfaces for the proxy class to implement
286         * @param proxy
287         *            the invocation handler to dispatch method invocations to
288         * @return a proxy instance with the specified invocation handler of a proxy
289         *         class that is defined by the specified class loader and that
290         *         implements the specified interfaces
291         */
292        public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) {
293                if (proxies.containsKey(proxy.getRef())) {
294                        System.out.println("Proxy trobat");
295                        return proxies.get(proxy.getRef());
296                }
297                Object value = Proxy.newProxyInstance(loader, interfaces, proxy);
298                proxies.put(proxy.getRef(), value);
299                return value;
300        }
301
302        /**
303         * Gets the Map used internally to retreive the response of the server
304         *
305         * @return a map with all the keys processed. Every key is a correlation id
306         *         of a method invoked remotely
307         */
308        public Map<String, byte[]> getResults() {
309                return results;
310        }
311
312        public static void stopProxy() {
313                proxies = new HashMap<String, Object>();
314        }
315
316        public static Map<String, Object> getProxies() {
317                return proxies;
318        }
319
320        public static void setProxies(Map<String, Object> proxies) {
321                Proxymq.proxies = proxies;
322        }
323
324        @Override
325        public String getRef() {
326                return uid;
327        }
328
329        @Override
330        public void notifyEvent(Event event) throws IOException, SerializerException {
331        }
332
333        @Override
334        public void addListener(EventListener<?> eventListener) throws Exception {
335                if (eventListener.getTopic() == null) {
336                        eventListener.setTopic(uid);
337                }
338                listeners.put(eventListener.getTopic(), eventListener);
339                dispatcher.addListener(eventListener);
340        }
341
342        @Override
343        public void removeListener(EventListener<?> eventListener) throws Exception {
344                listeners.remove(eventListener.getTopic());
345                dispatcher.removeListener(eventListener);
346        }
347
348        @Override
349        public Collection<EventListener<?>> getListeners() throws Exception {
350                return listeners.values();
351        }
352
353}
Note: See TracBrowser for help on using the repository browser.