source: trunk/src/main/java/omq/client/proxy/Proxymq.java @ 49

Last change on this file since 49 was 49, checked in by stoda, 11 years ago

log4j added

File size: 11.0 KB
Line 
1package omq.client.proxy;
2
3import java.io.IOException;
4import java.lang.reflect.InvocationHandler;
5import java.lang.reflect.Method;
6import java.lang.reflect.Proxy;
7import java.util.Collection;
8import java.util.HashMap;
9import java.util.Hashtable;
10import java.util.Map;
11import java.util.Properties;
12
13import org.apache.log4j.Logger;
14
15import omq.Remote;
16import omq.client.annotation.AsyncMethod;
17import omq.client.annotation.SyncMethod;
18import omq.client.listener.ResponseListener;
19import omq.common.broker.Broker;
20import omq.common.event.Event;
21import omq.common.event.EventDispatcher;
22import omq.common.event.EventListener;
23import omq.common.message.Request;
24import omq.common.message.Response;
25import omq.common.util.ParameterQueue;
26import omq.common.util.Serializer;
27import omq.exception.NoContainsInstanceException;
28import omq.exception.OmqException;
29import omq.exception.RetryException;
30import omq.exception.SerializerException;
31import omq.exception.TimeoutException;
32
33import com.rabbitmq.client.AMQP.BasicProperties;
34
35/**
36 * EvoProxy class. This class inherits from InvocationHandler and gives you a
37 * proxy with a server using an environment
38 *
39 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
40 *
41 */
42public class Proxymq implements InvocationHandler, Remote {
43
44        /**
45         *
46         */
47        private static final long serialVersionUID = 1L;
48        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
49        private static Map<String, Object> proxies = new Hashtable<String, Object>();
50
51        private String uid;
52        private transient String serializerType;
53        private transient ResponseListener rListener;
54        private transient EventDispatcher dispatcher;
55        // private transient Channel channel;
56        private transient Properties env;
57        private transient Map<String, byte[]> results;
58        private transient Map<String, EventListener<?>> listeners;
59
60        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
61        static {
62                primitiveClasses.put("byte", Byte.class);
63                primitiveClasses.put("short", Short.class);
64                primitiveClasses.put("char", Character.class);
65                primitiveClasses.put("int", Integer.class);
66                primitiveClasses.put("long", Long.class);
67                primitiveClasses.put("float", Float.class);
68                primitiveClasses.put("double", Double.class);
69        }
70
71        /**
72         * EvoProxy Constructor.
73         *
74         * This constructor uses an uid to know which object will call. It also uses
75         * Properties to set where to send the messages
76         *
77         * @param uid
78         *            The uid represents the unique identifier of a remote object
79         * @param clazz
80         *            It represents the real class of the remote object. With this
81         *            class the system can know the remoteInterface used and it can
82         *            also see which annotations are used
83         * @param env
84         *            The environment is used to know where to send the messages
85         * @throws Exception
86         */
87        public Proxymq(String uid, Class<?> clazz, Properties env) throws Exception {
88                this.uid = uid;
89                this.rListener = ResponseListener.getRequestListener();
90                this.dispatcher = EventDispatcher.getDispatcher();
91
92                // TODO what is better to use a new channel or to use the same?
93                // this.channel = Broker.getChannel();
94                this.env = env;
95
96                // set the serializer type
97                serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java);
98
99                listeners = new HashMap<String, EventListener<?>>();
100
101                // Create a new hashmap and registry it in rListener
102                results = new HashMap<String, byte[]>();
103                rListener.registerProxy(this);
104        }
105
106        @Override
107        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
108                // long timeStart = (new Date()).getTime();
109
110                // Local methods only
111                String methodName = method.getName();
112
113                // The local methods will be invoked here
114                if (method.getDeclaringClass().equals(Remote.class)) {
115                        if (methodName.equals("getRef")) {
116                                return getRef();
117                        } else if (methodName.equals("addListener")) {
118                                addListener((EventListener<?>) arguments[0]);
119                                return null;
120                        } else if (methodName.equals("removeListener")) {
121                                removeListener((EventListener<?>) arguments[0]);
122                                return null;
123                        } else if (methodName.equals("getListeners")) {
124                                return getListeners();
125                        }
126                }
127
128                // Create the request
129                Request request = createRequest(method, arguments);
130
131                // Log.saveTimeSendRequestLog("Client-time-request", request.getId(),
132                // method.getName(), timeStart);
133
134                Object response = null;
135                // Publish the request
136                if (request.isAsync()) {
137                        logger.debug("Publish async request -> " + request.getId());
138                        publishAsyncRequest(request);
139                } else {
140                        logger.debug("Publish sync request -> " + request.getId());
141                        response = publishSyncRequest(request, method.getReturnType());
142
143                        // long timeEnd = (new Date()).getTime();
144                        // Log.saveTimeSendRequestLog("Client-time-response",
145                        // request.getId(), method.getName(), timeEnd);
146                }
147
148                return response;
149        }
150
151        private void publishMessage(Request request, String replyQueueName) throws Exception {
152                String corrId = request.getId();
153
154                // Get the environment properties
155                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
156                String routingkey = this.uid;
157
158                // Add the correlation ID and create a replyTo property
159                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType).build();
160
161                // Publish the message
162                byte[] bytesRequest = Serializer.serialize(serializerType, request);
163                // TODO See this
164                // channel.basicPublish(exchange, routingkey, props, bytesRequest);
165                Broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
166                // Log.saveLog("Client-Serialize", bytesRequest);
167        }
168
169        private void publishAsyncRequest(Request request) throws Exception {
170                // Get the environment properties
171                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
172                publishMessage(request, replyQueueName);
173        }
174
175        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
176                String corrId = request.getId();
177
178                int retries = request.getRetries();
179                long timeout = request.getTimeout();
180
181                // Get the environment properties
182                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
183
184                // Publish the message
185                int i = 0;
186                while (i < retries) {
187                        try {
188                                publishMessage(request, replyQueueName);
189                                return getResult(corrId, timeout, type);
190                        } catch (TimeoutException te) {
191                                logger.error(te);
192                        }
193                        i++;
194                }
195                throw new RetryException(retries, timeout);
196        }
197
198        private Request createRequest(Method method, Object[] arguments) {
199                String corrId = java.util.UUID.randomUUID().toString();
200                String methodName = method.getName();
201
202                // Since we need to know whether the method is async and if it has to
203                // return using an annotation, we'll only check the AsyncMethod
204                // annotation
205                if (method.getAnnotation(AsyncMethod.class) == null) {
206                        int retries = 1;
207                        long timeout = ParameterQueue.DEFAULT_TIMEOUT;
208                        if (method.getAnnotation(SyncMethod.class) != null) {
209                                SyncMethod sync = method.getAnnotation(SyncMethod.class);
210                                retries = sync.retry();
211                                timeout = sync.timeout();
212                        }
213                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout);
214                } else {
215                        return Request.newAsyncRequest(corrId, methodName, arguments);
216                }
217        }
218
219        private Object getResult(String corrId, long timeout, Class<?> type) throws Exception {
220                Response resp = null;
221
222                // Wait for the results.
223                long localTimeout = 0;
224                long start = System.currentTimeMillis();
225                synchronized (results) {
226                        // Due to we are using notifyAll(), we need to control the real time
227                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
228                                results.wait(timeout);
229                                localTimeout = System.currentTimeMillis() - start;
230                        }
231                        if ((timeout - localTimeout) <= 0) {
232                                throw new TimeoutException("Timeout exception time: " + timeout);
233                        }
234                        resp = Serializer.deserializeResponse(results.get(corrId), type);
235                        // Log.saveLog("Client-Deserialize", results.get(corrId));
236
237                        // Remove and indicate the key exists (a hashmap can contain a null
238                        // object, using this we'll know whether a response has been
239                        // received before)
240                        results.put(corrId, null);
241                }
242
243                if (resp.getError() != null) {
244                        OmqException error = resp.getError();
245                        String name = error.getType();
246                        String message = error.getMessage();
247                        throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message);
248                }
249
250                return resp.getResult();
251        }
252
253        /**
254         *
255         * @param reference
256         *            RemoteObject reference
257         * @return true if the proxy has been created before or false in the other
258         *         case
259         */
260        public static boolean containsProxy(String reference) {
261                return proxies.containsKey(reference);
262        }
263
264        /**
265         *
266         * @param reference
267         *            RemoteObject reference
268         * @return a proxy instance
269         * @throws NoContainsInstanceException
270         */
271        public static Object getInstance(String reference) throws NoContainsInstanceException {
272                if (!containsProxy(reference)) {
273                        throw new NoContainsInstanceException(reference);
274                }
275                return proxies.get(reference);
276        }
277
278        /**
279         * Returns an instance of a proxy class for the specified interfaces that
280         * dispatches method invocations to the specified invocation handler. * @param
281         * loader
282         *
283         * @param loader
284         *            the class loader to define the proxy class
285         *
286         * @param interfaces
287         *            the list of interfaces for the proxy class to implement
288         * @param proxy
289         *            the invocation handler to dispatch method invocations to
290         * @return a proxy instance with the specified invocation handler of a proxy
291         *         class that is defined by the specified class loader and that
292         *         implements the specified interfaces
293         */
294        public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) {
295                if (proxies.containsKey(proxy.getRef())) {
296                        return proxies.get(proxy.getRef());
297                }
298                Object value = Proxy.newProxyInstance(loader, interfaces, proxy);
299                proxies.put(proxy.getRef(), value);
300                return value;
301        }
302
303        /**
304         * Gets the Map used internally to retreive the response of the server
305         *
306         * @return a map with all the keys processed. Every key is a correlation id
307         *         of a method invoked remotely
308         */
309        public Map<String, byte[]> getResults() {
310                return results;
311        }
312
313        public static void stopProxy() {
314                proxies = new HashMap<String, Object>();
315        }
316
317        public static Map<String, Object> getProxies() {
318                return proxies;
319        }
320
321        public static void setProxies(Map<String, Object> proxies) {
322                Proxymq.proxies = proxies;
323        }
324
325        @Override
326        public String getRef() {
327                return uid;
328        }
329
330        @Override
331        public void notifyEvent(Event event) throws IOException, SerializerException {
332        }
333
334        @Override
335        public void addListener(EventListener<?> eventListener) throws Exception {
336                if (eventListener.getTopic() == null) {
337                        eventListener.setTopic(uid);
338                }
339                listeners.put(eventListener.getTopic(), eventListener);
340                dispatcher.addListener(eventListener);
341        }
342
343        @Override
344        public void removeListener(EventListener<?> eventListener) throws Exception {
345                listeners.remove(eventListener.getTopic());
346                dispatcher.removeListener(eventListener);
347        }
348
349        @Override
350        public Collection<EventListener<?>> getListeners() throws Exception {
351                return listeners.values();
352        }
353
354}
Note: See TracBrowser for help on using the repository browser.