Changeset 53 for trunk/src/main/java/omq/client
- Timestamp:
- 06/20/13 16:57:39 (11 years ago)
- Location:
- trunk/src/main/java/omq/client
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/main/java/omq/client/listener/ResponseListener.java
r50 r53 28 28 */ 29 29 public class ResponseListener extends Thread { 30 private static final Logger logger = Logger.getLogger(ResponseListener.class.getName()); 31 private static ResponseListener rListener; 30 private final Logger logger = Logger.getLogger(ResponseListener.class.getName()); 32 31 32 private Broker broker; 33 33 private Channel channel; 34 34 private QueueingConsumer consumer; … … 43 43 * @throws Exception 44 44 */ 45 protected ResponseListener(Properties env) throws Exception { 46 this.env = env; 45 public ResponseListener(Broker broker) throws Exception { 46 this.broker = broker; 47 env = broker.getEnvironment(); 47 48 48 49 // Init the hashtable (it's concurrent) 49 this.results = new Hashtable<String, Map<String, byte[]>>();50 results = new Hashtable<String, Map<String, byte[]>>(); 50 51 51 52 startRPCQueue(); … … 107 108 108 109 private void startRPCQueue() throws Exception { 109 channel = Broker.getNewChannel();110 channel = broker.getNewChannel(); 110 111 111 112 Map<String, Object> args = null; … … 128 129 129 130 /** 130 * Static function which initializes the ResponseListener131 *132 * @param env133 * @throws Exception134 */135 public static void init(Properties env) throws Exception {136 if (rListener == null) {137 rListener = new ResponseListener(env);138 rListener.start();139 } else {140 throw new Exception("Cannot init because it already exists");141 }142 }143 144 /**145 * Method to retrieve the unique ResponseListener, this function can also146 * initialize a ResponseListener using and environment147 *148 * @param env149 * @return unique ResponseListener150 * @throws Exception151 */152 public static ResponseListener getRequestListener(Properties env) throws Exception {153 if (rListener == null) {154 rListener = new ResponseListener(env);155 rListener.start();156 } else {157 // TODO: create a new exception to indicate that a response listener158 // cannot be init159 throw new Exception("Cannot init because it already exists");160 }161 return rListener;162 }163 164 public static boolean isVoid() {165 return rListener == null;166 }167 168 /**169 * Method to retrieve the unique ResponseListener170 *171 * @return172 * @throws Exception173 */174 public static ResponseListener getRequestListener() throws Exception {175 if (rListener == null) {176 throw new Exception("Request listener not initialized");177 }178 return rListener;179 }180 181 /**182 131 * 183 132 * @param key … … 186 135 public boolean containsKey(String key) { 187 136 return results.containsKey(key); 188 }189 190 /**191 * This method is used to kill the unique responseListener in the system192 *193 * @throws Exception194 */195 public static void stopResponseListner() throws Exception {196 rListener.kill();197 rListener = null;198 137 } 199 138 -
trunk/src/main/java/omq/client/proxy/Proxymq.java
r49 r53 51 51 private String uid; 52 52 private transient String serializerType; 53 private transient Broker broker; 53 54 private transient ResponseListener rListener; 54 55 private transient EventDispatcher dispatcher; 56 private transient Serializer serializer; 55 57 // private transient Channel channel; 56 58 private transient Properties env; … … 85 87 * @throws Exception 86 88 */ 87 public Proxymq(String uid, Class<?> clazz, Properties env) throws Exception {89 public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception { 88 90 this.uid = uid; 89 this.rListener = ResponseListener.getRequestListener(); 90 this.dispatcher = EventDispatcher.getDispatcher(); 91 this.broker = broker; 92 rListener = broker.getResponseListener(); 93 dispatcher = broker.getEventDispatcher(); 94 serializer = broker.getSerializer(); 91 95 92 96 // TODO what is better to use a new channel or to use the same? 93 97 // this.channel = Broker.getChannel(); 94 this.env = env;98 env = broker.getEnvironment(); 95 99 96 100 // set the serializer type … … 160 164 161 165 // Publish the message 162 byte[] bytesRequest = Serializer.serialize(serializerType, request);166 byte[] bytesRequest = serializer.serialize(serializerType, request); 163 167 // TODO See this 164 168 // channel.basicPublish(exchange, routingkey, props, bytesRequest); 165 Broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);169 broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest); 166 170 // Log.saveLog("Client-Serialize", bytesRequest); 167 171 } … … 232 236 throw new TimeoutException("Timeout exception time: " + timeout); 233 237 } 234 resp = Serializer.deserializeResponse(results.get(corrId), type);238 resp = serializer.deserializeResponse(results.get(corrId), type); 235 239 // Log.saveLog("Client-Deserialize", results.get(corrId)); 236 240
Note: See TracChangeset
for help on using the changeset viewer.