Ignore:
Timestamp:
10/08/13 12:41:19 (11 years ago)
Author:
stoda
Message:
 
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/common/broker/Broker.java

    r95 r99  
    2828import com.rabbitmq.client.Connection;
    2929import com.rabbitmq.client.QueueingConsumer;
     30import com.rabbitmq.client.AMQP.BasicProperties;
    3031import com.rabbitmq.client.QueueingConsumer.Delivery;
    3132import com.rabbitmq.client.ShutdownListener;
     
    129130         * @throws Exception
    130131         */
    131         public Channel getChannel() throws Exception {
     132        public synchronized Channel getChannel() throws Exception {
    132133                return channel;
    133134        }
     
    139140         * @throws IOException
    140141         */
    141         public Channel getNewChannel() throws IOException {
     142        public synchronized Channel getNewChannel() throws IOException {
    142143                return connection.createChannel();
     144        }
     145
     146        /**
     147         * Publishes a request using a thread safe channel -if the channel is closed
     148         * it tries to open it once
     149         *
     150         * @param exchange
     151         * @param routingKey
     152         * @param props
     153         * @param bytesRequest
     154         * @throws IOException
     155         */
     156        public synchronized void publishMessge(String exchange, String routingKey, BasicProperties props, byte[] bytesRequest) throws IOException {
     157                if (!channel.isOpen()) {
     158                        logger.error("Broker's channel is closed opening a new one", channel.getCloseReason());
     159                        channel = getNewChannel();
     160                }
     161                channel.basicPublish(exchange, routingKey, props, bytesRequest);
    143162        }
    144163
Note: See TracChangeset for help on using the changeset viewer.