Changeset 104
- Timestamp:
- 10/16/13 13:01:04 (11 years ago)
- Location:
- branches/supervisor
- Files:
-
- 2 added
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/src/main/java/omq/client/annotation/MultiMethod.java
r83 r104 15 15 @Target(ElementType.METHOD) 16 16 public @interface MultiMethod { 17 /**18 * If @MultiMethod is followed by @SyncMethod waitNum indicates how many19 * responses we will wait for.20 *21 * @return length of the array of responses we are waiting for.22 */23 int waitNum() default 1;24 17 } -
branches/supervisor/src/main/java/omq/client/proxy/Proxymq.java
r99 r104 4 4 import java.lang.reflect.InvocationHandler; 5 5 import java.lang.reflect.Method; 6 import java.util.ArrayList; 6 7 import java.util.HashMap; 8 import java.util.List; 7 9 import java.util.Map; 8 10 import java.util.Properties; … … 201 203 publishMessage(request, replyQueueName); 202 204 if (request.isMulti()) { 203 return getResults(corrId, request.getWait(),timeout, type);205 return getResults(corrId, timeout, type); 204 206 } else { 205 207 return getResult(corrId, timeout, type); … … 228 230 String methodName = method.getName(); 229 231 boolean multi = false; 230 int wait = 0;231 232 232 233 if (method.getAnnotation(MultiMethod.class) != null) { 233 234 multi = true; 234 wait = method.getAnnotation(MultiMethod.class).waitNum();235 235 } 236 236 … … 246 246 timeout = sync.timeout(); 247 247 } 248 return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi , wait);248 return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi); 249 249 } else { 250 250 return Request.newAsyncRequest(corrId, methodName, arguments, multi); … … 291 291 * @param corrId 292 292 * - Correlation Id of the request 293 * @param wait294 * - Array length295 293 * @param timeout 296 294 * - Timeout read in @SyncMethod.timeout(). If the timeout is set … … 302 300 * @throws Exception 303 301 */ 304 private Object getResults(String corrId, int wait,long timeout, Class<?> type) throws Exception {302 private Object getResults(String corrId, long timeout, Class<?> type) throws Exception { 305 303 Response resp = null; 306 304 // Get the component type of an array 307 305 Class<?> actualType = type.getComponentType(); 308 306 309 Object array = Array.newInstance(actualType, wait);307 List<Object> list = new ArrayList<Object>(); 310 308 311 309 int i = 0; … … 313 311 long start = System.currentTimeMillis(); 314 312 315 while ( i < wait) {313 while (true) { 316 314 synchronized (results) { 317 315 // Due to we are using notifyAll(), we need to control the real … … 322 320 } 323 321 if ((timeout - localTimeout) <= 0) { 324 throw new TimeoutException("Timeout exception time: " + timeout);322 break; 325 323 } 326 324 // Remove the corrId to receive new replies 327 325 resp = serializer.deserializeResponse(results.remove(corrId), actualType); 328 Array.set(array, i,resp.getResult());326 list.add(resp.getResult()); 329 327 } 330 328 i++; 331 329 } 330 331 if (i == 0) { 332 results.remove(corrId); 333 throw new TimeoutException("Timeout exception time: " + timeout); 334 } 335 332 336 synchronized (results) { 333 337 results.put(corrId, null); 338 } 339 340 Object array = Array.newInstance(actualType, i); 341 i = 0; 342 for (Object o : list) { 343 Array.set(array, i++, o); 334 344 } 335 345 -
branches/supervisor/src/main/java/omq/common/message/Request.java
r83 r104 25 25 26 26 private transient boolean multi; 27 private transient int wait;28 27 private transient long timeout; 29 28 private transient int retries; … … 78 77 * @param multi 79 78 * - If the method is multi 80 * @param wait81 * - If the method is multi how many responses will be listened82 79 * @return - new SyncRequest 83 80 */ 84 public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout, boolean multi , int wait) {81 public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout, boolean multi) { 85 82 Request req = new Request(id, method, false, params, multi); 86 83 req.setRetries(retries); 87 84 req.setTimeout(timeout); 88 req.setWait(wait);89 85 return req; 90 86 } … … 163 159 } 164 160 165 public int getWait() {166 return wait;167 }168 169 public void setWait(int wait) {170 this.wait = wait;171 }172 161 } -
branches/supervisor/src/test/java/omq/test/multiProcess/NumberClient.java
r83 r104 24 24 public void setMultiNumber(int x); 25 25 26 @MultiMethod( waitNum = 2)26 @MultiMethod() 27 27 @SyncMethod(timeout = 1000) 28 28 public int[] getMultiNumber();
Note: See TracChangeset
for help on using the changeset viewer.