Changeset 58 for trunk/src/main/java/omq/client/proxy
- Timestamp:
- 06/25/13 16:38:31 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/main/java/omq/client/proxy/Proxymq.java
r57 r58 2 2 3 3 import java.io.IOException; 4 import java.lang.reflect.Array; 4 5 import java.lang.reflect.InvocationHandler; 5 6 import java.lang.reflect.Method; 6 7 import java.lang.reflect.Proxy; 7 import java.util.ArrayList;8 8 import java.util.Collection; 9 9 import java.util.HashMap; 10 10 import java.util.Hashtable; 11 import java.util.List;12 11 import java.util.Map; 13 12 import java.util.Properties; 14 15 import org.apache.log4j.Logger;16 13 17 14 import omq.Remote; … … 34 31 import omq.exception.TimeoutException; 35 32 33 import org.apache.log4j.Logger; 34 36 35 import com.rabbitmq.client.AMQP.BasicProperties; 37 36 … … 190 189 try { 191 190 publishMessage(request, replyQueueName); 192 return getResult(corrId, timeout, type); 191 if (request.isMulti()) { 192 return getResults(corrId, 2, timeout, type); 193 } else { 194 return getResult(corrId, timeout, type); 195 } 196 193 197 } catch (TimeoutException te) { 194 198 logger.error(te); … … 203 207 String methodName = method.getName(); 204 208 boolean multi = false; 209 int wait = 0; 205 210 206 211 if (method.getAnnotation(MultiMethod.class) != null) { 207 212 multi = true; 213 wait = method.getAnnotation(MultiMethod.class).waitNum(); 208 214 } 209 215 … … 219 225 timeout = sync.timeout(); 220 226 } 221 return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi );227 return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi, wait); 222 228 } else { 223 229 return Request.newAsyncRequest(corrId, methodName, arguments, multi); … … 229 235 230 236 // Wait for the results. 231 long localTimeout = 0;237 long localTimeout = timeout; 232 238 long start = System.currentTimeMillis(); 233 239 synchronized (results) { 234 240 // Due to we are using notifyAll(), we need to control the real time 235 241 while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) { 236 results.wait( timeout);242 results.wait(localTimeout); 237 243 localTimeout = System.currentTimeMillis() - start; 238 244 } … … 258 264 } 259 265 260 @SuppressWarnings("unused")261 266 private Object getResults(String corrId, int wait, long timeout, Class<?> type) throws Exception { 262 267 Response resp = null; 263 List<Object> list = new ArrayList<Object>(); 268 Class<?> actualType = type.getComponentType(); 269 270 Object array = Array.newInstance(actualType, wait); 264 271 265 272 int i = 0; … … 279 286 } 280 287 // Remove the corrId to receive new replies 281 resp = serializer.deserializeResponse(results.remove(corrId), type);282 list.add(resp.getResult());283 288 resp = serializer.deserializeResponse(results.remove(corrId), actualType); 289 System.out.println("/n/n/n/n/nResult type: "+resp.getResult()+" /n/n/n/n/n"); 290 Array.set(array, i, resp.getResult()); 284 291 } 285 292 i++; … … 289 296 } 290 297 291 return list;298 return array; 292 299 } 293 300
Note: See TracChangeset
for help on using the changeset viewer.