Changeset 54 for trunk/src/main/java/omq
- Timestamp:
- 06/21/13 12:42:25 (11 years ago)
- Location:
- trunk/src/main/java/omq
- Files:
-
- 1 added
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/main/java/omq/client/listener/ResponseListener.java
r53 r54 55 55 @Override 56 56 public void run() { 57 logger.info("ResponseListener started"); 57 58 Delivery delivery; 58 59 String uid_request; … … 122 123 123 124 channel.queueDeclare(reply_queue, durable, false, false, args); 125 logger.info("ResponseListener creating queue: " + reply_queue + ", durable: " + durable); 124 126 125 127 // Declare a new consumer -
trunk/src/main/java/omq/client/proxy/Proxymq.java
r53 r54 15 15 import omq.Remote; 16 16 import omq.client.annotation.AsyncMethod; 17 import omq.client.annotation.MultiMethod; 17 18 import omq.client.annotation.SyncMethod; 18 19 import omq.client.listener.ResponseListener; … … 133 134 Request request = createRequest(method, arguments); 134 135 135 // Log.saveTimeSendRequestLog("Client-time-request", request.getId(),136 // method.getName(), timeStart);137 138 136 Object response = null; 139 137 // Publish the request … … 144 142 logger.debug("Publish sync request -> " + request.getId()); 145 143 response = publishSyncRequest(request, method.getReturnType()); 146 147 // long timeEnd = (new Date()).getTime();148 // Log.saveTimeSendRequestLog("Client-time-response",149 // request.getId(), method.getName(), timeEnd);150 144 } 151 145 … … 168 162 // channel.basicPublish(exchange, routingkey, props, bytesRequest); 169 163 broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest); 170 // Log.saveLog("Client-Serialize", bytesRequest);171 164 } 172 165 … … 203 196 String corrId = java.util.UUID.randomUUID().toString(); 204 197 String methodName = method.getName(); 198 boolean multi = false; 199 200 if (method.getAnnotation(MultiMethod.class) != null) { 201 multi = true; 202 } 205 203 206 204 // Since we need to know whether the method is async and if it has to … … 217 215 return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout); 218 216 } else { 219 return Request.newAsyncRequest(corrId, methodName, arguments );217 return Request.newAsyncRequest(corrId, methodName, arguments, multi); 220 218 } 221 219 } … … 237 235 } 238 236 resp = serializer.deserializeResponse(results.get(corrId), type); 239 // Log.saveLog("Client-Deserialize", results.get(corrId));240 237 241 238 // Remove and indicate the key exists (a hashmap can contain a null -
trunk/src/main/java/omq/common/broker/Broker.java
r53 r54 176 176 if (responseListener == null) { 177 177 responseListener = new ResponseListener(this); 178 responseListener.start(); 178 179 } 179 180 if (eventDispatcher == null) { 180 181 eventDispatcher = new EventDispatcher(this); 182 eventDispatcher.start(); 181 183 } 182 184 } … … 197 199 byte[] bytesResponse = serializer.serialize(wrapper); 198 200 channel.basicPublish(UID, "", null, bytesResponse); 199 200 // Log.saveLog("Server-Serialize", bytesResponse);201 201 } 202 202 -
trunk/src/main/java/omq/common/event/EventDispatcher.java
r53 r54 56 56 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false")); 57 57 channel.queueDeclare(event_queue, durable, false, false, null); 58 logger.info("EventDispatcher creating queue: " + event_queue + ", durable: " + durable); 58 59 59 60 // Declare a new consumer … … 72 73 @Override 73 74 public void run() { 75 logger.info("EventDispatcher started"); 74 76 Delivery delivery; 75 77 Event event; … … 84 86 85 87 logger.info("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId()); 86 // Log.saveLog("Client-Deserialize", delivery.getBody());87 88 // long timeEnd = (new Date()).getTime();89 // Log.saveTimeSendRequestLog("Client-time-response",90 // event.getCorrId(), "Event!", timeEnd);91 88 92 89 // Dispatch it -
trunk/src/main/java/omq/common/message/Request.java
r44 r54 15 15 private boolean async = false; 16 16 17 private transient boolean multi; 17 18 private transient long timeout; 18 19 private transient int retries; … … 34 35 } 35 36 37 public Request(String id2, String method2, boolean b, Object[] params2, boolean multi2) { 38 // TODO Auto-generated constructor stub 39 } 40 36 41 public static Request newSyncRequest(String id, String method, Object[] params) { 37 42 return new Request(id, method, false, params); … … 45 50 } 46 51 47 public static Request newAsyncRequest(String id, String method, Object[] params ) {48 return new Request(id, method, true, params );52 public static Request newAsyncRequest(String id, String method, Object[] params, boolean multi) { 53 return new Request(id, method, true, params, multi); 49 54 } 50 55 … … 97 102 } 98 103 104 public boolean isMulti() { 105 return multi; 106 } 107 108 public void setMulti(boolean multi) { 109 this.multi = multi; 110 } 99 111 } -
trunk/src/main/java/omq/server/InvocationThread.java
r53 r54 44 44 // Deserialize the json 45 45 Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj); 46 // Log.saveLog("Server-Deserialize", delivery.getBody());47 48 46 String methodName = request.getMethod(); 49 47 String requestID = request.getId(); … … 77 75 byte[] bytesResponse = serializer.serialize(serializerType, resp); 78 76 channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse); 79 80 // Log.saveLog("Server-Serialize", bytesResponse);77 logger.debug("Publish sync response -> Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + " replyTo: " 78 + props.getReplyTo()); 81 79 } 82 80 -
trunk/src/main/java/omq/server/RemoteObject.java
r53 r54 35 35 36 36 private static final long serialVersionUID = -1778953938739846450L; 37 private static final String multi = "multi#"; 37 38 private static final Logger logger = Logger.getLogger(RemoteObject.class.getName()); 38 39 … … 230 231 logger.info("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue); 231 232 channel.exchangeDeclare(exchange, "direct"); 233 channel.exchangeDeclare(multi + exchange, "fanout"); 232 234 channel.queueDeclare(queue, durable, false, false, null); 233 235 channel.queueBind(queue, exchange, routingKey); 236 channel.queueBind(queue, multi + exchange, routingKey); 234 237 235 238 // Declare the event topic fanout
Note: See TracChangeset
for help on using the changeset viewer.