[70] | 1 | package omq.client.proxy; |
---|
| 2 | |
---|
| 3 | import java.lang.reflect.InvocationHandler; |
---|
| 4 | import java.lang.reflect.Method; |
---|
| 5 | import java.util.Properties; |
---|
| 6 | |
---|
[75] | 7 | import org.apache.log4j.Logger; |
---|
| 8 | |
---|
[70] | 9 | import com.rabbitmq.client.AMQP.BasicProperties; |
---|
| 10 | |
---|
| 11 | import omq.common.broker.Broker; |
---|
| 12 | import omq.common.message.Request; |
---|
| 13 | import omq.common.util.ParameterQueue; |
---|
| 14 | import omq.common.util.Serializer; |
---|
| 15 | |
---|
| 16 | /** |
---|
[83] | 17 | * MultiProxy class. Every proxy created with this class will invoke |
---|
| 18 | * multi-asynchronous methods. |
---|
[70] | 19 | * |
---|
[83] | 20 | * @author Sergi Toda <sergi.toda@estudiants.urv.cat> |
---|
[70] | 21 | * |
---|
| 22 | */ |
---|
| 23 | public class MultiProxymq implements InvocationHandler { |
---|
[75] | 24 | private static final Logger logger = Logger.getLogger(MultiProxymq.class.getName()); |
---|
[70] | 25 | private static final String multi = "multi#"; |
---|
| 26 | |
---|
| 27 | private String uid; |
---|
| 28 | private Broker broker; |
---|
| 29 | private Serializer serializer; |
---|
| 30 | private String replyQueueName; |
---|
| 31 | private String exchange; |
---|
| 32 | private static final String routingkey = ""; |
---|
| 33 | private transient String serializerType; |
---|
| 34 | |
---|
| 35 | public MultiProxymq(String uid, Class<?> clazz, Broker broker) throws Exception { |
---|
| 36 | this.uid = uid; |
---|
| 37 | this.broker = broker; |
---|
| 38 | serializer = broker.getSerializer(); |
---|
| 39 | |
---|
| 40 | Properties env = broker.getEnvironment(); |
---|
[75] | 41 | exchange = multi + uid; |
---|
[77] | 42 | serializerType = env.getProperty(ParameterQueue.PROXY_SERIALIZER, Serializer.JAVA); |
---|
[70] | 43 | } |
---|
| 44 | |
---|
| 45 | @Override |
---|
| 46 | public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { |
---|
| 47 | String methodName = method.getName(); |
---|
| 48 | String corrId = java.util.UUID.randomUUID().toString(); |
---|
| 49 | boolean multi = true; |
---|
| 50 | |
---|
| 51 | Request request = Request.newAsyncRequest(corrId, methodName, args, multi); |
---|
| 52 | |
---|
| 53 | // Add the correlation ID and create a replyTo property |
---|
[78] | 54 | BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).type(serializerType).build(); |
---|
[70] | 55 | |
---|
| 56 | byte[] bytesRequest = serializer.serialize(serializerType, request); |
---|
[98] | 57 | broker.publishMessge(exchange, routingkey, props, bytesRequest); |
---|
[70] | 58 | |
---|
[75] | 59 | logger.debug("Proxymq: " + uid + " invokes " + methodName + ", corrID" + corrId + ", exchange: " + exchange + ", replyQueue: " + replyQueueName |
---|
| 60 | + ", serializerType: " + serializerType + ", multi call: " + request.isMulti() + ", async call: " + request.isAsync()); |
---|
| 61 | |
---|
[70] | 62 | return null; |
---|
| 63 | } |
---|
| 64 | |
---|
| 65 | } |
---|