Changeset 99
- Timestamp:
- 10/08/13 12:41:19 (11 years ago)
- Location:
- branches/supervisor
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/pom.xml
r84 r99 4 4 <groupId>objectmq</groupId> 5 5 <artifactId>objectmq</artifactId> 6 <version>0. 5.5</version>6 <version>0.6.0</version> 7 7 <name>objectmq</name> 8 8 <description>Middleware based on AMQP</description> -
branches/supervisor/src/main/java/omq/client/proxy/MultiProxymq.java
r83 r99 55 55 56 56 byte[] bytesRequest = serializer.serialize(serializerType, request); 57 broker. getChannel().basicPublish(exchange, routingkey, props, bytesRequest);57 broker.publishMessge(exchange, routingkey, props, bytesRequest); 58 58 59 59 logger.debug("Proxymq: " + uid + " invokes " + methodName + ", corrID" + corrId + ", exchange: " + exchange + ", replyQueue: " + replyQueueName -
branches/supervisor/src/main/java/omq/client/proxy/Proxymq.java
r92 r99 172 172 // Publish the message 173 173 byte[] bytesRequest = serializer.serialize(serializerType, request); 174 broker. getChannel().basicPublish(exchange, routingkey, props, bytesRequest);174 broker.publishMessge(exchange, routingkey, props, bytesRequest); 175 175 logger.debug("Proxymq: " + uid + " invokes '" + request.getMethod() + "' , corrID: " + corrId + ", exchange: " + exchange + ", replyQueue: " 176 176 + replyQueueName + ", serializerType: " + serializerType + ", multi call: " + request.isMulti() + ", async call: " + request.isAsync() -
branches/supervisor/src/main/java/omq/common/broker/Broker.java
r95 r99 28 28 import com.rabbitmq.client.Connection; 29 29 import com.rabbitmq.client.QueueingConsumer; 30 import com.rabbitmq.client.AMQP.BasicProperties; 30 31 import com.rabbitmq.client.QueueingConsumer.Delivery; 31 32 import com.rabbitmq.client.ShutdownListener; … … 129 130 * @throws Exception 130 131 */ 131 public Channel getChannel() throws Exception {132 public synchronized Channel getChannel() throws Exception { 132 133 return channel; 133 134 } … … 139 140 * @throws IOException 140 141 */ 141 public Channel getNewChannel() throws IOException {142 public synchronized Channel getNewChannel() throws IOException { 142 143 return connection.createChannel(); 144 } 145 146 /** 147 * Publishes a request using a thread safe channel -if the channel is closed 148 * it tries to open it once 149 * 150 * @param exchange 151 * @param routingKey 152 * @param props 153 * @param bytesRequest 154 * @throws IOException 155 */ 156 public synchronized void publishMessge(String exchange, String routingKey, BasicProperties props, byte[] bytesRequest) throws IOException { 157 if (!channel.isOpen()) { 158 logger.error("Broker's channel is closed opening a new one", channel.getCloseReason()); 159 channel = getNewChannel(); 160 } 161 channel.basicPublish(exchange, routingKey, props, bytesRequest); 143 162 } 144 163
Note: See TracChangeset
for help on using the changeset viewer.