Changeset 98 for trunk/src/main
- Timestamp:
- 10/08/13 10:49:46 (11 years ago)
- Location:
- trunk/src/main/java/omq
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/main/java/omq/client/proxy/MultiProxymq.java
r83 r98 55 55 56 56 byte[] bytesRequest = serializer.serialize(serializerType, request); 57 broker. getChannel().basicPublish(exchange, routingkey, props, bytesRequest);57 broker.publishMessge(exchange, routingkey, props, bytesRequest); 58 58 59 59 logger.debug("Proxymq: " + uid + " invokes " + methodName + ", corrID" + corrId + ", exchange: " + exchange + ", replyQueue: " + replyQueueName -
trunk/src/main/java/omq/client/proxy/Proxymq.java
r84 r98 165 165 // Publish the message 166 166 byte[] bytesRequest = serializer.serialize(serializerType, request); 167 broker. getChannel().basicPublish(exchange, routingkey, props, bytesRequest);167 broker.publishMessge(exchange, routingkey, props, bytesRequest); 168 168 logger.debug("Proxymq: " + uid + " invokes '" + request.getMethod() + "' , corrID: " + corrId + ", exchange: " + exchange + ", replyQueue: " 169 169 + replyQueueName + ", serializerType: " + serializerType + ", multi call: " + request.isMulti() + ", async call: " + request.isAsync() -
trunk/src/main/java/omq/common/broker/Broker.java
r83 r98 27 27 import com.rabbitmq.client.Connection; 28 28 import com.rabbitmq.client.QueueingConsumer; 29 import com.rabbitmq.client.AMQP.BasicProperties; 29 30 import com.rabbitmq.client.QueueingConsumer.Delivery; 30 31 import com.rabbitmq.client.ShutdownListener; … … 125 126 * @throws Exception 126 127 */ 127 public Channel getChannel() throws Exception {128 public synchronized Channel getChannel() throws Exception { 128 129 return channel; 129 130 } … … 135 136 * @throws IOException 136 137 */ 137 public Channel getNewChannel() throws IOException {138 public synchronized Channel getNewChannel() throws IOException { 138 139 return connection.createChannel(); 140 } 141 142 /** 143 * 144 */ 145 public synchronized void publishMessge(String exchange, String routingKey, BasicProperties props, byte[] bytesRequest) throws IOException { 146 if (!channel.isOpen()) { 147 logger.error("Broker's channel is closed opening a new one", channel.getCloseReason()); 148 channel = getNewChannel(); 149 } 150 channel.basicPublish(exchange, routingKey, props, bytesRequest); 139 151 } 140 152
Note: See TracChangeset
for help on using the changeset viewer.