- Timestamp:
- 10/16/13 13:01:04 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/src/main/java/omq/client/proxy/Proxymq.java
r99 r104 4 4 import java.lang.reflect.InvocationHandler; 5 5 import java.lang.reflect.Method; 6 import java.util.ArrayList; 6 7 import java.util.HashMap; 8 import java.util.List; 7 9 import java.util.Map; 8 10 import java.util.Properties; … … 201 203 publishMessage(request, replyQueueName); 202 204 if (request.isMulti()) { 203 return getResults(corrId, request.getWait(),timeout, type);205 return getResults(corrId, timeout, type); 204 206 } else { 205 207 return getResult(corrId, timeout, type); … … 228 230 String methodName = method.getName(); 229 231 boolean multi = false; 230 int wait = 0;231 232 232 233 if (method.getAnnotation(MultiMethod.class) != null) { 233 234 multi = true; 234 wait = method.getAnnotation(MultiMethod.class).waitNum();235 235 } 236 236 … … 246 246 timeout = sync.timeout(); 247 247 } 248 return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi , wait);248 return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi); 249 249 } else { 250 250 return Request.newAsyncRequest(corrId, methodName, arguments, multi); … … 291 291 * @param corrId 292 292 * - Correlation Id of the request 293 * @param wait294 * - Array length295 293 * @param timeout 296 294 * - Timeout read in @SyncMethod.timeout(). If the timeout is set … … 302 300 * @throws Exception 303 301 */ 304 private Object getResults(String corrId, int wait,long timeout, Class<?> type) throws Exception {302 private Object getResults(String corrId, long timeout, Class<?> type) throws Exception { 305 303 Response resp = null; 306 304 // Get the component type of an array 307 305 Class<?> actualType = type.getComponentType(); 308 306 309 Object array = Array.newInstance(actualType, wait);307 List<Object> list = new ArrayList<Object>(); 310 308 311 309 int i = 0; … … 313 311 long start = System.currentTimeMillis(); 314 312 315 while ( i < wait) {313 while (true) { 316 314 synchronized (results) { 317 315 // Due to we are using notifyAll(), we need to control the real … … 322 320 } 323 321 if ((timeout - localTimeout) <= 0) { 324 throw new TimeoutException("Timeout exception time: " + timeout);322 break; 325 323 } 326 324 // Remove the corrId to receive new replies 327 325 resp = serializer.deserializeResponse(results.remove(corrId), actualType); 328 Array.set(array, i,resp.getResult());326 list.add(resp.getResult()); 329 327 } 330 328 i++; 331 329 } 330 331 if (i == 0) { 332 results.remove(corrId); 333 throw new TimeoutException("Timeout exception time: " + timeout); 334 } 335 332 336 synchronized (results) { 333 337 results.put(corrId, null); 338 } 339 340 Object array = Array.newInstance(actualType, i); 341 i = 0; 342 for (Object o : list) { 343 Array.set(array, i++, o); 334 344 } 335 345
Note: See TracChangeset
for help on using the changeset viewer.