Changeset 98


Ignore:
Timestamp:
10/08/13 10:49:46 (11 years ago)
Author:
stoda
Message:

0.5.6
Synchronized channel and reopening when they are closed

Location:
trunk
Files:
1 added
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/pom.xml

    r84 r98  
    44        <groupId>objectmq</groupId>
    55        <artifactId>objectmq</artifactId>
    6         <version>0.5.5</version>
     6        <version>0.5.6</version>
    77        <name>objectmq</name>
    88        <description>Middleware based on AMQP</description>
  • trunk/src/main/java/omq/client/proxy/MultiProxymq.java

    r83 r98  
    5555
    5656                byte[] bytesRequest = serializer.serialize(serializerType, request);
    57                 broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
     57                broker.publishMessge(exchange, routingkey, props, bytesRequest);
    5858
    5959                logger.debug("Proxymq: " + uid + " invokes " + methodName + ", corrID" + corrId + ", exchange: " + exchange + ", replyQueue: " + replyQueueName
  • trunk/src/main/java/omq/client/proxy/Proxymq.java

    r84 r98  
    165165                // Publish the message
    166166                byte[] bytesRequest = serializer.serialize(serializerType, request);
    167                 broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
     167                broker.publishMessge(exchange, routingkey, props, bytesRequest);
    168168                logger.debug("Proxymq: " + uid + " invokes '" + request.getMethod() + "' , corrID: " + corrId + ", exchange: " + exchange + ", replyQueue: "
    169169                                + replyQueueName + ", serializerType: " + serializerType + ", multi call: " + request.isMulti() + ", async call: " + request.isAsync()
  • trunk/src/main/java/omq/common/broker/Broker.java

    r83 r98  
    2727import com.rabbitmq.client.Connection;
    2828import com.rabbitmq.client.QueueingConsumer;
     29import com.rabbitmq.client.AMQP.BasicProperties;
    2930import com.rabbitmq.client.QueueingConsumer.Delivery;
    3031import com.rabbitmq.client.ShutdownListener;
     
    125126         * @throws Exception
    126127         */
    127         public Channel getChannel() throws Exception {
     128        public synchronized Channel getChannel() throws Exception {
    128129                return channel;
    129130        }
     
    135136         * @throws IOException
    136137         */
    137         public Channel getNewChannel() throws IOException {
     138        public synchronized Channel getNewChannel() throws IOException {
    138139                return connection.createChannel();
     140        }
     141
     142        /**
     143         *
     144         */
     145        public synchronized void publishMessge(String exchange, String routingKey, BasicProperties props, byte[] bytesRequest) throws IOException {
     146                if (!channel.isOpen()) {
     147                        logger.error("Broker's channel is closed opening a new one", channel.getCloseReason());
     148                        channel = getNewChannel();
     149                }
     150                channel.basicPublish(exchange, routingKey, props, bytesRequest);
    139151        }
    140152
Note: See TracChangeset for help on using the changeset viewer.