1 | package omq.client.proxy; |
---|
2 | |
---|
3 | import java.lang.reflect.InvocationHandler; |
---|
4 | import java.lang.reflect.Method; |
---|
5 | import java.util.Properties; |
---|
6 | |
---|
7 | import org.apache.log4j.Logger; |
---|
8 | |
---|
9 | import com.rabbitmq.client.AMQP.BasicProperties; |
---|
10 | |
---|
11 | import omq.common.broker.Broker; |
---|
12 | import omq.common.message.Request; |
---|
13 | import omq.common.util.ParameterQueue; |
---|
14 | import omq.common.util.Serializer; |
---|
15 | |
---|
16 | /** |
---|
17 | * MultiProxy class. Every proxy created with this class will invoke |
---|
18 | * multi-asynchronous methods. |
---|
19 | * |
---|
20 | * @author Sergi Toda <sergi.toda@estudiants.urv.cat> |
---|
21 | * |
---|
22 | */ |
---|
23 | public class MultiProxymq implements InvocationHandler { |
---|
24 | private static final Logger logger = Logger.getLogger(MultiProxymq.class.getName()); |
---|
25 | private static final String multi = "multi#"; |
---|
26 | |
---|
27 | private String uid; |
---|
28 | private Broker broker; |
---|
29 | private Serializer serializer; |
---|
30 | private String replyQueueName; |
---|
31 | private String exchange; |
---|
32 | private static final String routingkey = ""; |
---|
33 | private transient String serializerType; |
---|
34 | |
---|
35 | public MultiProxymq(String uid, Class<?> clazz, Broker broker) throws Exception { |
---|
36 | this.uid = uid; |
---|
37 | this.broker = broker; |
---|
38 | serializer = broker.getSerializer(); |
---|
39 | |
---|
40 | Properties env = broker.getEnvironment(); |
---|
41 | exchange = multi + uid; |
---|
42 | serializerType = env.getProperty(ParameterQueue.PROXY_SERIALIZER, Serializer.JAVA); |
---|
43 | } |
---|
44 | |
---|
45 | @Override |
---|
46 | public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { |
---|
47 | String methodName = method.getName(); |
---|
48 | String corrId = java.util.UUID.randomUUID().toString(); |
---|
49 | boolean multi = true; |
---|
50 | |
---|
51 | Request request = Request.newAsyncRequest(corrId, methodName, args, multi); |
---|
52 | |
---|
53 | // Add the correlation ID and create a replyTo property |
---|
54 | BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).type(serializerType).build(); |
---|
55 | |
---|
56 | byte[] bytesRequest = serializer.serialize(serializerType, request); |
---|
57 | broker.publishMessge(exchange, routingkey, props, bytesRequest); |
---|
58 | |
---|
59 | logger.debug("Proxymq: " + uid + " invokes " + methodName + ", corrID" + corrId + ", exchange: " + exchange + ", replyQueue: " + replyQueueName |
---|
60 | + ", serializerType: " + serializerType + ", multi call: " + request.isMulti() + ", async call: " + request.isAsync()); |
---|
61 | |
---|
62 | return null; |
---|
63 | } |
---|
64 | |
---|
65 | } |
---|