source: trunk/src/main/java/omq/client/proxy/Proxymq.java @ 53

Last change on this file since 53 was 53, checked in by stoda, 11 years ago

Non static broker
TODO: change all test to see whether the new broker configuration works

File size: 11.2 KB
Line 
1package omq.client.proxy;
2
3import java.io.IOException;
4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
6import java.lang.reflect.Proxy;
7import java.util.Collection;
8import java.util.HashMap;
9import java.util.Hashtable;
10import java.util.Map;
11import java.util.Properties;
12
13import org.apache.log4j.Logger;
14
15import omq.Remote;
16import omq.client.annotation.AsyncMethod;
17import omq.client.annotation.SyncMethod;
18import omq.client.listener.ResponseListener;
19import omq.common.broker.Broker;
20import omq.common.event.Event;
21import omq.common.event.EventDispatcher;
22import omq.common.event.EventListener;
23import omq.common.message.Request;
24import omq.common.message.Response;
25import omq.common.util.ParameterQueue;
26import omq.common.util.Serializer;
27import omq.exception.NoContainsInstanceException;
28import omq.exception.OmqException;
29import omq.exception.RetryException;
30import omq.exception.SerializerException;
31import omq.exception.TimeoutException;
32
33import com.rabbitmq.client.AMQP.BasicProperties;
34
35/**
36 * EvoProxy class. This class inherits from InvocationHandler and gives you a
37 * proxy with a server using an environment
38 *
39 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
40 *
41 */
42public class Proxymq implements InvocationHandler, Remote {
43
44        /**
45         *
46         */
47        private static final long serialVersionUID = 1L;
48        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
49        private static Map<String, Object> proxies = new Hashtable<String, Object>();
50
51        private String uid;
52        private transient String serializerType;
53        private transient Broker broker;
54        private transient ResponseListener rListener;
55        private transient EventDispatcher dispatcher;
56        private transient Serializer serializer;
57        // private transient Channel channel;
58        private transient Properties env;
59        private transient Map<String, byte[]> results;
60        private transient Map<String, EventListener<?>> listeners;
61
62        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
63        static {
64                primitiveClasses.put("byte", Byte.class);
65                primitiveClasses.put("short", Short.class);
66                primitiveClasses.put("char", Character.class);
67                primitiveClasses.put("int", Integer.class);
68                primitiveClasses.put("long", Long.class);
69                primitiveClasses.put("float", Float.class);
70                primitiveClasses.put("double", Double.class);
71        }
72
73        /**
74         * EvoProxy Constructor.
75         *
76         * This constructor uses an uid to know which object will call. It also uses
77         * Properties to set where to send the messages
78         *
79         * @param uid
80         *            The uid represents the unique identifier of a remote object
81         * @param clazz
82         *            It represents the real class of the remote object. With this
83         *            class the system can know the remoteInterface used and it can
84         *            also see which annotations are used
85         * @param env
86         *            The environment is used to know where to send the messages
87         * @throws Exception
88         */
89        public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception {
90                this.uid = uid;
91                this.broker = broker;
92                rListener = broker.getResponseListener();
93                dispatcher = broker.getEventDispatcher();
94                serializer = broker.getSerializer();
95
96                // TODO what is better to use a new channel or to use the same?
97                // this.channel = Broker.getChannel();
98                env = broker.getEnvironment();
99
100                // set the serializer type
101                serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java);
102
103                listeners = new HashMap<String, EventListener<?>>();
104
105                // Create a new hashmap and registry it in rListener
106                results = new HashMap<String, byte[]>();
107                rListener.registerProxy(this);
108        }
109
110        @Override
111        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
112                // long timeStart = (new Date()).getTime();
113
114                // Local methods only
115                String methodName = method.getName();
116
117                // The local methods will be invoked here
118                if (method.getDeclaringClass().equals(Remote.class)) {
119                        if (methodName.equals("getRef")) {
120                                return getRef();
121                        } else if (methodName.equals("addListener")) {
122                                addListener((EventListener<?>) arguments[0]);
123                                return null;
124                        } else if (methodName.equals("removeListener")) {
125                                removeListener((EventListener<?>) arguments[0]);
126                                return null;
127                        } else if (methodName.equals("getListeners")) {
128                                return getListeners();
129                        }
130                }
131
132                // Create the request
133                Request request = createRequest(method, arguments);
134
135                // Log.saveTimeSendRequestLog("Client-time-request", request.getId(),
136                // method.getName(), timeStart);
137
138                Object response = null;
139                // Publish the request
140                if (request.isAsync()) {
141                        logger.debug("Publish async request -> " + request.getId());
142                        publishAsyncRequest(request);
143                } else {
144                        logger.debug("Publish sync request -> " + request.getId());
145                        response = publishSyncRequest(request, method.getReturnType());
146
147                        // long timeEnd = (new Date()).getTime();
148                        // Log.saveTimeSendRequestLog("Client-time-response",
149                        // request.getId(), method.getName(), timeEnd);
150                }
151
152                return response;
153        }
154
155        private void publishMessage(Request request, String replyQueueName) throws Exception {
156                String corrId = request.getId();
157
158                // Get the environment properties
159                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
160                String routingkey = this.uid;
161
162                // Add the correlation ID and create a replyTo property
163                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType).build();
164
165                // Publish the message
166                byte[] bytesRequest = serializer.serialize(serializerType, request);
167                // TODO See this
168                // channel.basicPublish(exchange, routingkey, props, bytesRequest);
169                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
170                // Log.saveLog("Client-Serialize", bytesRequest);
171        }
172
173        private void publishAsyncRequest(Request request) throws Exception {
174                // Get the environment properties
175                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
176                publishMessage(request, replyQueueName);
177        }
178
179        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
180                String corrId = request.getId();
181
182                int retries = request.getRetries();
183                long timeout = request.getTimeout();
184
185                // Get the environment properties
186                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
187
188                // Publish the message
189                int i = 0;
190                while (i < retries) {
191                        try {
192                                publishMessage(request, replyQueueName);
193                                return getResult(corrId, timeout, type);
194                        } catch (TimeoutException te) {
195                                logger.error(te);
196                        }
197                        i++;
198                }
199                throw new RetryException(retries, timeout);
200        }
201
202        private Request createRequest(Method method, Object[] arguments) {
203                String corrId = java.util.UUID.randomUUID().toString();
204                String methodName = method.getName();
205
206                // Since we need to know whether the method is async and if it has to
207                // return using an annotation, we'll only check the AsyncMethod
208                // annotation
209                if (method.getAnnotation(AsyncMethod.class) == null) {
210                        int retries = 1;
211                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
212                        if (method.getAnnotation(SyncMethod.class) != null) {
213                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
214                                retries = sync.retry();
215                                timeout = sync.timeout();
216                        }
217                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout);
218                } else {
219                        return Request.newAsyncRequest(corrId, methodName, arguments);
220                }
221        }
222
223        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
224                Response resp = null;
225
226                // Wait for the results.
227                long localTimeout = 0;
228                long start = System.currentTimeMillis();
229                synchronized (results) {
230                        // Due to we are using notifyAll(), we need to control the real time
231                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
232                                results.wait(timeout);
233                                localTimeout = System.currentTimeMillis() - start;
234                        }
235                        if ((timeout - localTimeout) <= 0) {
236                                throw new TimeoutException("Timeout exception time: " + timeout);
237                        }
238                        resp = serializer.deserializeResponse(results.get(corrId), type);
239                        // Log.saveLog("Client-Deserialize", results.get(corrId));
240
241                        // Remove and indicate the key exists (a hashmap can contain a null
242                        // object, using this we'll know whether a response has been
243                        // received before)
244                        results.put(corrId, null);
245                }
246
247                if (resp.getError() != null) {
248                        OmqException error = resp.getError();
249                        String name = error.getType();
250                        String message = error.getMessage();
251                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
252                }
253
254                return resp.getResult();
255        }
256
257        /**
258         *
259         * @param reference
260         *            RemoteObject reference
261         * @return true if the proxy has been created before or false in the other
262         *         case
263         */
264        public static boolean containsProxy(String reference) {
265                return proxies.containsKey(reference);
266        }
267
268        /**
269         *
270         * @param reference
271         *            RemoteObject reference
272         * @return a proxy instance
273         * @throws NoContainsInstanceException
274         */
275        public static Object getInstance(String reference) throws NoContainsInstanceException {
276                if (!containsProxy(reference)) {
277                        throw new NoContainsInstanceException(reference);
278                }
279                return proxies.get(reference);
280        }
281
282        /**
283         * Returns an instance of a proxy class for the specified interfaces that
284         * dispatches method invocations to the specified invocation handler. * @param
285         * loader
286         *
287         * @param loader
288         *            the class loader to define the proxy class
289         *
290         * @param interfaces
291         *            the list of interfaces for the proxy class to implement
292         * @param proxy
293         *            the invocation handler to dispatch method invocations to
294         * @return a proxy instance with the specified invocation handler of a proxy
295         *         class that is defined by the specified class loader and that
296         *         implements the specified interfaces
297         */
298        public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) {
299                if (proxies.containsKey(proxy.getRef())) {
300                        return proxies.get(proxy.getRef());
301                }
302                Object value = Proxy.newProxyInstance(loader, interfaces, proxy);
303                proxies.put(proxy.getRef(), value);
304                return value;
305        }
306
307        /**
308         * Gets the Map used internally to retreive the response of the server
309         *
310         * @return a map with all the keys processed. Every key is a correlation id
311         *         of a method invoked remotely
312         */
313        public Map<String, byte[]> getResults() {
314                return results;
315        }
316
317        public static void stopProxy() {
318                proxies = new HashMap<String, Object>();
319        }
320
321        public static Map<String, Object> getProxies() {
322                return proxies;
323        }
324
325        public static void setProxies(Map<String, Object> proxies) {
326                Proxymq.proxies = proxies;
327        }
328
329        @Override
330        public String getRef() {
331                return uid;
332        }
333
334        @Override
335        public void notifyEvent(Event event) throws IOException, SerializerException {
336        }
337
338        @Override
339        public void addListener(EventListener<?> eventListener) throws Exception {
340                if (eventListener.getTopic() == null) {
341                        eventListener.setTopic(uid);
342                }
343                listeners.put(eventListener.getTopic(), eventListener);
344                dispatcher.addListener(eventListener);
345        }
346
347        @Override
348        public void removeListener(EventListener<?> eventListener) throws Exception {
349                listeners.remove(eventListener.getTopic());
350                dispatcher.removeListener(eventListener);
351        }
352
353        @Override
354        public Collection<EventListener<?>> getListeners() throws Exception {
355                return listeners.values();
356        }
357
358}
Note: See TracBrowser for help on using the repository browser.