Changeset 99


Ignore:
Timestamp:
10/08/13 12:41:19 (11 years ago)
Author:
stoda
Message:
 
Location:
branches/supervisor
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/pom.xml

    r84 r99  
    44        <groupId>objectmq</groupId>
    55        <artifactId>objectmq</artifactId>
    6         <version>0.5.5</version>
     6        <version>0.6.0</version>
    77        <name>objectmq</name>
    88        <description>Middleware based on AMQP</description>
  • branches/supervisor/src/main/java/omq/client/proxy/MultiProxymq.java

    r83 r99  
    5555
    5656                byte[] bytesRequest = serializer.serialize(serializerType, request);
    57                 broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
     57                broker.publishMessge(exchange, routingkey, props, bytesRequest);
    5858
    5959                logger.debug("Proxymq: " + uid + " invokes " + methodName + ", corrID" + corrId + ", exchange: " + exchange + ", replyQueue: " + replyQueueName
  • branches/supervisor/src/main/java/omq/client/proxy/Proxymq.java

    r92 r99  
    172172                // Publish the message
    173173                byte[] bytesRequest = serializer.serialize(serializerType, request);
    174                 broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
     174                broker.publishMessge(exchange, routingkey, props, bytesRequest);
    175175                logger.debug("Proxymq: " + uid + " invokes '" + request.getMethod() + "' , corrID: " + corrId + ", exchange: " + exchange + ", replyQueue: "
    176176                                + replyQueueName + ", serializerType: " + serializerType + ", multi call: " + request.isMulti() + ", async call: " + request.isAsync()
  • branches/supervisor/src/main/java/omq/common/broker/Broker.java

    r95 r99  
    2828import com.rabbitmq.client.Connection;
    2929import com.rabbitmq.client.QueueingConsumer;
     30import com.rabbitmq.client.AMQP.BasicProperties;
    3031import com.rabbitmq.client.QueueingConsumer.Delivery;
    3132import com.rabbitmq.client.ShutdownListener;
     
    129130         * @throws Exception
    130131         */
    131         public Channel getChannel() throws Exception {
     132        public synchronized Channel getChannel() throws Exception {
    132133                return channel;
    133134        }
     
    139140         * @throws IOException
    140141         */
    141         public Channel getNewChannel() throws IOException {
     142        public synchronized Channel getNewChannel() throws IOException {
    142143                return connection.createChannel();
     144        }
     145
     146        /**
     147         * Publishes a request using a thread safe channel -if the channel is closed
     148         * it tries to open it once
     149         *
     150         * @param exchange
     151         * @param routingKey
     152         * @param props
     153         * @param bytesRequest
     154         * @throws IOException
     155         */
     156        public synchronized void publishMessge(String exchange, String routingKey, BasicProperties props, byte[] bytesRequest) throws IOException {
     157                if (!channel.isOpen()) {
     158                        logger.error("Broker's channel is closed opening a new one", channel.getCloseReason());
     159                        channel = getNewChannel();
     160                }
     161                channel.basicPublish(exchange, routingKey, props, bytesRequest);
    143162        }
    144163
Note: See TracChangeset for help on using the changeset viewer.