Changeset 58
- Timestamp:
- 06/25/13 16:38:31 (11 years ago)
- Location:
- trunk/src
- Files:
-
- 1 added
- 1 deleted
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/main/java/omq/client/proxy/Proxymq.java
r57 r58 2 2 3 3 import java.io.IOException; 4 import java.lang.reflect.Array; 4 5 import java.lang.reflect.InvocationHandler; 5 6 import java.lang.reflect.Method; 6 7 import java.lang.reflect.Proxy; 7 import java.util.ArrayList;8 8 import java.util.Collection; 9 9 import java.util.HashMap; 10 10 import java.util.Hashtable; 11 import java.util.List;12 11 import java.util.Map; 13 12 import java.util.Properties; 14 15 import org.apache.log4j.Logger;16 13 17 14 import omq.Remote; … … 34 31 import omq.exception.TimeoutException; 35 32 33 import org.apache.log4j.Logger; 34 36 35 import com.rabbitmq.client.AMQP.BasicProperties; 37 36 … … 190 189 try { 191 190 publishMessage(request, replyQueueName); 192 return getResult(corrId, timeout, type); 191 if (request.isMulti()) { 192 return getResults(corrId, 2, timeout, type); 193 } else { 194 return getResult(corrId, timeout, type); 195 } 196 193 197 } catch (TimeoutException te) { 194 198 logger.error(te); … … 203 207 String methodName = method.getName(); 204 208 boolean multi = false; 209 int wait = 0; 205 210 206 211 if (method.getAnnotation(MultiMethod.class) != null) { 207 212 multi = true; 213 wait = method.getAnnotation(MultiMethod.class).waitNum(); 208 214 } 209 215 … … 219 225 timeout = sync.timeout(); 220 226 } 221 return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi );227 return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi, wait); 222 228 } else { 223 229 return Request.newAsyncRequest(corrId, methodName, arguments, multi); … … 229 235 230 236 // Wait for the results. 231 long localTimeout = 0;237 long localTimeout = timeout; 232 238 long start = System.currentTimeMillis(); 233 239 synchronized (results) { 234 240 // Due to we are using notifyAll(), we need to control the real time 235 241 while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) { 236 results.wait( timeout);242 results.wait(localTimeout); 237 243 localTimeout = System.currentTimeMillis() - start; 238 244 } … … 258 264 } 259 265 260 @SuppressWarnings("unused")261 266 private Object getResults(String corrId, int wait, long timeout, Class<?> type) throws Exception { 262 267 Response resp = null; 263 List<Object> list = new ArrayList<Object>(); 268 Class<?> actualType = type.getComponentType(); 269 270 Object array = Array.newInstance(actualType, wait); 264 271 265 272 int i = 0; … … 279 286 } 280 287 // Remove the corrId to receive new replies 281 resp = serializer.deserializeResponse(results.remove(corrId), type);282 list.add(resp.getResult());283 288 resp = serializer.deserializeResponse(results.remove(corrId), actualType); 289 System.out.println("/n/n/n/n/nResult type: "+resp.getResult()+" /n/n/n/n/n"); 290 Array.set(array, i, resp.getResult()); 284 291 } 285 292 i++; … … 289 296 } 290 297 291 return list;298 return array; 292 299 } 293 300 -
trunk/src/main/java/omq/common/message/Request.java
r55 r58 16 16 17 17 private transient boolean multi; 18 private transient int wait; 18 19 private transient long timeout; 19 20 private transient int retries; … … 47 48 } 48 49 49 public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout, boolean multi ) {50 public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout, boolean multi, int wait) { 50 51 Request req = new Request(id, method, false, params, multi); 51 52 req.setRetries(retries); 52 53 req.setTimeout(timeout); 54 req.setWait(wait); 53 55 return req; 54 56 } … … 113 115 this.multi = multi; 114 116 } 117 118 public int getWait() { 119 return wait; 120 } 121 122 public void setWait(int wait) { 123 this.wait = wait; 124 } 115 125 } -
trunk/src/main/java/omq/server/RemoteObject.java
r57 r58 39 39 40 40 private String UID; 41 private String multiQueue; 41 42 private Properties env; 42 43 private transient Broker broker; … … 66 67 this.broker = broker; 67 68 UID = reference; 69 multiQueue = UID + System.currentTimeMillis(); 68 70 env = broker.getEnvironment(); 69 71 serializer = broker.getSerializer(); … … 225 227 // Multi info 226 228 String multiExchange = multi + exchange; 227 //TODO: change this228 String multiQueue = UID + System.currentTimeMillis();229 229 230 230 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false")); -
trunk/src/test/java/omq/test/multiProcess/MultiProcessTest.java
r57 r58 21 21 public class MultiProcessTest { 22 22 public static Broker broker; 23 public static Number remoteNumber;23 public static NumberClient remoteNumber; 24 24 25 25 public MultiProcessTest(String type) throws Exception { … … 44 44 45 45 broker = new Broker(env); 46 remoteNumber = broker.lookup("number", Number .class);46 remoteNumber = broker.lookup("number", NumberClient.class); 47 47 } 48 48 … … 85 85 int x = 10; 86 86 remoteNumber.setNumber(x); 87 int a = remoteNumber.getNum er();87 int a = remoteNumber.getNumber(); 88 88 assertEquals(0, a); 89 int b = remoteNumber.getNum er();89 int b = remoteNumber.getNumber(); 90 90 assertEquals(x, b); 91 91 remoteNumber.setNumber(0); … … 98 98 remoteNumber.setMultiNumber(x); 99 99 Thread.sleep(200); 100 int a = remoteNumber.getNum er();101 int b = remoteNumber.getNum er();100 int a = remoteNumber.getNumber(); 101 int b = remoteNumber.getNumber(); 102 102 assertEquals(x, a); 103 103 assertEquals(x, b); 104 int[] number = remoteNumber.getMultiNumber(); 105 assertEquals(x, number[0]); 106 assertEquals(x, number[1]); 104 107 remoteNumber.setMultiNumber(0); 105 108 Thread.sleep(200); -
trunk/src/test/java/omq/test/multiProcess/Number.java
r54 r58 2 2 3 3 import omq.Remote; 4 import omq.client.annotation.AsyncMethod;5 import omq.client.annotation.MultiMethod;6 import omq.client.annotation.RemoteInterface;7 import omq.client.annotation.SyncMethod;8 4 9 @RemoteInterface10 5 public interface Number extends Remote { 11 @SyncMethod(timeout = 1000)12 6 public void setNumber(int x); 13 7 14 @SyncMethod(timeout = 1000) 15 public int getNumer(); 8 public int getNumber(); 16 9 17 @MultiMethod18 @AsyncMethod19 10 public void setMultiNumber(int x); 11 12 public int getMultiNumber(); 20 13 } -
trunk/src/test/java/omq/test/multiProcess/NumberImpl.java
r54 r58 1 1 package omq.test.multiProcess; 2 2 3 import omq.client.annotation.AsyncMethod;4 import omq.client.annotation.MultiMethod;5 import omq.client.annotation.SyncMethod;6 3 import omq.server.RemoteObject; 7 4 … … 20 17 } 21 18 22 @Override23 @SyncMethod24 19 public void setNumber(int x) { 25 20 this.x = x; 26 21 } 27 22 28 @Override 29 @SyncMethod(timeout = 1000) 30 public int getNumer() { 23 public int getNumber() { 31 24 return x; 32 25 } 33 26 34 @Override35 @MultiMethod36 @AsyncMethod37 27 public void setMultiNumber(int x) { 38 28 this.x = x; 39 29 } 40 30 31 public int getMultiNumber() { 32 return x; 33 } 34 41 35 }
Note: See TracChangeset
for help on using the changeset viewer.