1 | package omq.client.proxy; |
---|
2 | |
---|
3 | import java.lang.reflect.InvocationHandler; |
---|
4 | import java.lang.reflect.Method; |
---|
5 | import java.util.Properties; |
---|
6 | |
---|
7 | import com.rabbitmq.client.AMQP.BasicProperties; |
---|
8 | |
---|
9 | import omq.common.broker.Broker; |
---|
10 | import omq.common.message.Request; |
---|
11 | import omq.common.util.ParameterQueue; |
---|
12 | import omq.common.util.Serializer; |
---|
13 | |
---|
14 | /** |
---|
15 | * TODO Aquesta classe s'eliminarà tant bon punt es faci un proxymq més |
---|
16 | * intel·ligent |
---|
17 | * |
---|
18 | * @author sergi |
---|
19 | * |
---|
20 | */ |
---|
21 | public class MultiProxymq implements InvocationHandler { |
---|
22 | private static final String multi = "multi#"; |
---|
23 | |
---|
24 | private String uid; |
---|
25 | private Broker broker; |
---|
26 | private Serializer serializer; |
---|
27 | private String replyQueueName; |
---|
28 | private String exchange; |
---|
29 | private static final String routingkey = ""; |
---|
30 | private transient String serializerType; |
---|
31 | |
---|
32 | public MultiProxymq(String uid, Class<?> clazz, Broker broker) throws Exception { |
---|
33 | this.uid = uid; |
---|
34 | this.broker = broker; |
---|
35 | serializer = broker.getSerializer(); |
---|
36 | |
---|
37 | Properties env = broker.getEnvironment(); |
---|
38 | replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE); |
---|
39 | exchange = multi + env.getProperty(ParameterQueue.RPC_EXCHANGE); |
---|
40 | serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA); |
---|
41 | } |
---|
42 | |
---|
43 | @Override |
---|
44 | public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { |
---|
45 | String methodName = method.getName(); |
---|
46 | String corrId = java.util.UUID.randomUUID().toString(); |
---|
47 | boolean multi = true; |
---|
48 | |
---|
49 | Request request = Request.newAsyncRequest(corrId, methodName, args, multi); |
---|
50 | |
---|
51 | // Add the correlation ID and create a replyTo property |
---|
52 | BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType).build(); |
---|
53 | |
---|
54 | byte[] bytesRequest = serializer.serialize(serializerType, request); |
---|
55 | broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest); |
---|
56 | |
---|
57 | return null; |
---|
58 | } |
---|
59 | |
---|
60 | } |
---|