Changeset 58 for trunk/src/main/java
- Timestamp:
- 06/25/13 16:38:31 (11 years ago)
- Location:
- trunk/src/main/java/omq
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/main/java/omq/client/proxy/Proxymq.java
r57 r58 2 2 3 3 import java.io.IOException; 4 import java.lang.reflect.Array; 4 5 import java.lang.reflect.InvocationHandler; 5 6 import java.lang.reflect.Method; 6 7 import java.lang.reflect.Proxy; 7 import java.util.ArrayList;8 8 import java.util.Collection; 9 9 import java.util.HashMap; 10 10 import java.util.Hashtable; 11 import java.util.List;12 11 import java.util.Map; 13 12 import java.util.Properties; 14 15 import org.apache.log4j.Logger;16 13 17 14 import omq.Remote; … … 34 31 import omq.exception.TimeoutException; 35 32 33 import org.apache.log4j.Logger; 34 36 35 import com.rabbitmq.client.AMQP.BasicProperties; 37 36 … … 190 189 try { 191 190 publishMessage(request, replyQueueName); 192 return getResult(corrId, timeout, type); 191 if (request.isMulti()) { 192 return getResults(corrId, 2, timeout, type); 193 } else { 194 return getResult(corrId, timeout, type); 195 } 196 193 197 } catch (TimeoutException te) { 194 198 logger.error(te); … … 203 207 String methodName = method.getName(); 204 208 boolean multi = false; 209 int wait = 0; 205 210 206 211 if (method.getAnnotation(MultiMethod.class) != null) { 207 212 multi = true; 213 wait = method.getAnnotation(MultiMethod.class).waitNum(); 208 214 } 209 215 … … 219 225 timeout = sync.timeout(); 220 226 } 221 return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi );227 return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi, wait); 222 228 } else { 223 229 return Request.newAsyncRequest(corrId, methodName, arguments, multi); … … 229 235 230 236 // Wait for the results. 231 long localTimeout = 0;237 long localTimeout = timeout; 232 238 long start = System.currentTimeMillis(); 233 239 synchronized (results) { 234 240 // Due to we are using notifyAll(), we need to control the real time 235 241 while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) { 236 results.wait( timeout);242 results.wait(localTimeout); 237 243 localTimeout = System.currentTimeMillis() - start; 238 244 } … … 258 264 } 259 265 260 @SuppressWarnings("unused")261 266 private Object getResults(String corrId, int wait, long timeout, Class<?> type) throws Exception { 262 267 Response resp = null; 263 List<Object> list = new ArrayList<Object>(); 268 Class<?> actualType = type.getComponentType(); 269 270 Object array = Array.newInstance(actualType, wait); 264 271 265 272 int i = 0; … … 279 286 } 280 287 // Remove the corrId to receive new replies 281 resp = serializer.deserializeResponse(results.remove(corrId), type);282 list.add(resp.getResult());283 288 resp = serializer.deserializeResponse(results.remove(corrId), actualType); 289 System.out.println("/n/n/n/n/nResult type: "+resp.getResult()+" /n/n/n/n/n"); 290 Array.set(array, i, resp.getResult()); 284 291 } 285 292 i++; … … 289 296 } 290 297 291 return list;298 return array; 292 299 } 293 300 -
trunk/src/main/java/omq/common/message/Request.java
r55 r58 16 16 17 17 private transient boolean multi; 18 private transient int wait; 18 19 private transient long timeout; 19 20 private transient int retries; … … 47 48 } 48 49 49 public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout, boolean multi ) {50 public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout, boolean multi, int wait) { 50 51 Request req = new Request(id, method, false, params, multi); 51 52 req.setRetries(retries); 52 53 req.setTimeout(timeout); 54 req.setWait(wait); 53 55 return req; 54 56 } … … 113 115 this.multi = multi; 114 116 } 117 118 public int getWait() { 119 return wait; 120 } 121 122 public void setWait(int wait) { 123 this.wait = wait; 124 } 115 125 } -
trunk/src/main/java/omq/server/RemoteObject.java
r57 r58 39 39 40 40 private String UID; 41 private String multiQueue; 41 42 private Properties env; 42 43 private transient Broker broker; … … 66 67 this.broker = broker; 67 68 UID = reference; 69 multiQueue = UID + System.currentTimeMillis(); 68 70 env = broker.getEnvironment(); 69 71 serializer = broker.getSerializer(); … … 225 227 // Multi info 226 228 String multiExchange = multi + exchange; 227 //TODO: change this228 String multiQueue = UID + System.currentTimeMillis();229 229 230 230 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
Note: See TracChangeset
for help on using the changeset viewer.