Changeset 55
- Timestamp:
- 06/21/13 16:55:58 (11 years ago)
- Location:
- trunk
- Files:
-
- 1 added
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/main/java/omq/client/proxy/Proxymq.java
r54 r55 48 48 private static final long serialVersionUID = 1L; 49 49 private static final Logger logger = Logger.getLogger(Proxymq.class.getName()); 50 private static final String multi = "multi#"; 50 51 private static Map<String, Object> proxies = new Hashtable<String, Object>(); 51 52 … … 56 57 private transient EventDispatcher dispatcher; 57 58 private transient Serializer serializer; 58 // private transient Channel channel;59 59 private transient Properties env; 60 60 private transient Map<String, byte[]> results; … … 111 111 @Override 112 112 public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { 113 // long timeStart = (new Date()).getTime();114 115 113 // Local methods only 116 114 String methodName = method.getName(); … … 151 149 152 150 // Get the environment properties 153 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE); 154 String routingkey = this.uid; 151 String exchange; 152 String routingkey; 153 154 if (request.isMulti()) { 155 exchange = multi + env.getProperty(ParameterQueue.RPC_EXCHANGE); 156 routingkey = ""; 157 } else { 158 exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE); 159 routingkey = uid; 160 } 155 161 156 162 // Add the correlation ID and create a replyTo property … … 159 165 // Publish the message 160 166 byte[] bytesRequest = serializer.serialize(serializerType, request); 161 // TODO See this162 // channel.basicPublish(exchange, routingkey, props, bytesRequest);163 167 broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest); 164 168 } … … 213 217 timeout = sync.timeout(); 214 218 } 215 return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout );219 return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi); 216 220 } else { 217 221 return Request.newAsyncRequest(corrId, methodName, arguments, multi); -
trunk/src/main/java/omq/common/message/Request.java
r54 r55 35 35 } 36 36 37 public Request(String id2, String method2, boolean b, Object[] params2, boolean multi2) { 38 // TODO Auto-generated constructor stub 37 private Request(String id, String method, boolean async, Object[] params, boolean multi) { 38 this.id = id; 39 this.method = method; 40 this.async = async; 41 this.params = params; 42 this.multi = multi; 39 43 } 40 44 … … 43 47 } 44 48 45 public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout ) {46 Request req = new Request(id, method, false, params );49 public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout, boolean multi) { 50 Request req = new Request(id, method, false, params, multi); 47 51 req.setRetries(retries); 48 52 req.setTimeout(timeout); -
trunk/src/main/java/omq/server/RemoteObject.java
r54 r55 223 223 String queue = UID; 224 224 String routingKey = UID; 225 // Multi info 226 String multiExchange = multi + exchange; 227 String multiQueue = UID + System.currentTimeMillis(); 228 225 229 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false")); 226 230 … … 231 235 logger.info("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue); 232 236 channel.exchangeDeclare(exchange, "direct"); 233 channel.exchangeDeclare(multi + exchange, "fanout");234 237 channel.queueDeclare(queue, durable, false, false, null); 235 238 channel.queueBind(queue, exchange, routingKey); 236 channel.queueBind(queue, multi + exchange, routingKey); 239 240 channel.exchangeDeclare(multiExchange, "fanout"); 241 channel.queueDeclare(multiQueue, durable, false, false, null); 242 channel.queueBind(multiQueue, multiExchange, ""); 237 243 238 244 // Declare the event topic fanout … … 243 249 consumer = new QueueingConsumer(channel); 244 250 channel.basicConsume(queue, true, consumer); 251 channel.basicConsume(multiQueue, true, consumer); 245 252 } 246 253 -
trunk/src/test/java/omq/test/multiProcess/MultiProcessTest.java
r54 r55 90 90 assertEquals(x, b); 91 91 remoteNumber.setNumber(0); 92 remoteNumber.setNumber(0); 92 93 } 93 94
Note: See TracChangeset
for help on using the changeset viewer.