Ignore:
Timestamp:
10/02/13 17:31:26 (11 years ago)
Author:
stoda
Message:

ObjectMQ without RemoteWrapper? - one consumer per thread -

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/server/InvocationThread.java

    r91 r96  
    11package omq.server;
    22
     3import java.io.IOException;
    34import java.lang.reflect.InvocationTargetException;
    4 import java.util.concurrent.BlockingQueue;
    5 
     5import java.util.Properties;
     6
     7import omq.common.broker.Broker;
    68import omq.common.message.Request;
    79import omq.common.message.Response;
     10import omq.common.util.ParameterQueue;
    811import omq.common.util.Serializer;
    912import omq.exception.OmqException;
     13import omq.exception.SerializerException;
    1014
    1115import org.apache.log4j.Logger;
     
    1317import com.rabbitmq.client.AMQP.BasicProperties;
    1418import com.rabbitmq.client.Channel;
     19import com.rabbitmq.client.ConsumerCancelledException;
     20import com.rabbitmq.client.QueueingConsumer;
    1521import com.rabbitmq.client.QueueingConsumer.Delivery;
     22import com.rabbitmq.client.ShutdownSignalException;
    1623
    1724/**
     
    2229 */
    2330public class InvocationThread extends Thread {
     31
    2432        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
     33        private static final String multi = "multi#";
     34
     35        // RemoteObject
    2536        private RemoteObject obj;
     37        private String UID;
     38        private Properties env;
     39
     40        // Broker
     41        private Broker broker;
    2642        private Serializer serializer;
    27         // private RemoteWrapper wrapper;
    28         private BlockingQueue<Delivery> deliveryQueue;
     43
     44        // Consumer
     45        private Channel channel;
     46        private QueueingConsumer consumer;
     47        private String multiQueue;
    2948        private boolean killed = false;
    3049
    31         public InvocationThread(RemoteObject obj, RemoteWrapper wrapper, Serializer serializer) {
     50        public InvocationThread(RemoteObject obj, Broker broker) throws Exception {
    3251                this.obj = obj;
    33                 // this.wrapper = wrapper;
    34                 this.deliveryQueue = wrapper.getDeliveryQueue();
    35                 this.serializer = serializer;
     52                this.UID = obj.getRef();
     53                this.env = obj.getEnv();
     54                this.broker = broker;
     55                this.serializer = broker.getSerializer();
     56        }
     57
     58        @Override
     59        public synchronized void start() {
     60                try {
     61                        startQueues();
     62                        super.start();
     63                } catch (Exception e) {
     64                        logger.error("Cannot start a remoteObject", e);
     65                }
     66
    3667        }
    3768
     
    4172                        try {
    4273                                // Get the delivery
    43                                 Delivery delivery = deliveryQueue.take();
    44 
    45                                 // // Indicate this thread is not available
    46                                 // wrapper.increaseBusy();
     74                                Delivery delivery = consumer.nextDelivery();
    4775
    4876                                String serializerType = delivery.getProperties().getType();
     
    6997                                }
    7098
    71                                
    72                                 Channel channel = obj.getChannel();
    73                                
    74                                
    7599                                // Reply if it's necessary
    76100                                if (!request.isAsync()) {
    77101                                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
    78 
    79                                        
    80102
    81103                                        BasicProperties props = delivery.getProperties();
     
    88110                                                        + props.getReplyTo());
    89111                                }
    90                                
     112
    91113                                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    92                                
    93                                 // // Indicate this thread is available
    94                                 // wrapper.decreaseBusy();
    95114                        } catch (InterruptedException i) {
    96115                                logger.error(i);
    97                                 killed = true;
     116                        } catch (ShutdownSignalException e) {
     117                                logger.error(e);
     118                                try {
     119                                        if (channel.isOpen()) {
     120                                                channel.close();
     121                                        }
     122                                        startQueues();
     123                                } catch (Exception e1) {
     124                                        try {
     125                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
     126                                                Thread.sleep(milis);
     127                                        } catch (InterruptedException e2) {
     128                                                logger.error(e2);
     129                                        }
     130                                        logger.error(e1);
     131                                }
     132                        } catch (ConsumerCancelledException e) {
     133                                logger.error(e);
     134                        } catch (SerializerException e) {
     135                                logger.error(e);
    98136                        } catch (Exception e) {
    99                                 logger.error("Object: " + obj.getRef(), e);
     137                                logger.error(e);
    100138                        }
    101139
    102140                }
     141        }
     142
     143        /**
     144         * This method starts the queues using the information got in the
     145         * environment.
     146         *
     147         * @throws Exception
     148         */
     149        private void startQueues() throws Exception {
     150                // Start channel
     151                channel = broker.getNewChannel();
     152
     153                /*
     154                 * Default queue, Round Robin behaviour
     155                 */
     156
     157                // Get info about which exchange and queue will use
     158                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
     159                String queue = UID;
     160                String routingKey = UID;
     161
     162                // RemoteObject default queue
     163                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
     164                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
     165                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
     166
     167                // Declares and bindings
     168                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
     169                        channel.exchangeDeclare(exchange, "direct");
     170                }
     171                channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
     172                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
     173                        channel.queueBind(queue, exchange, routingKey);
     174                }
     175                logger.info("RemoteObject: " + UID + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
     176                                + exclusive + ", AutoDelete: " + autoDelete);
     177
     178                /*
     179                 * Multi queue, exclusive per each instance
     180                 */
     181
     182                // Get info about the multiQueue
     183                String multiExchange = multi + UID;
     184                // TODO:String multiExchange = multi + exchange + UID;
     185                multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
     186
     187                // Multi queue (exclusive queue per remoteObject)
     188                boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
     189                boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
     190                boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
     191
     192                // Declares and bindings
     193                channel.exchangeDeclare(multiExchange, "fanout");
     194                if (multiQueue == null) {
     195                        multiQueue = channel.queueDeclare().getQueue();
     196                } else {
     197                        channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
     198                }
     199                channel.queueBind(multiQueue, multiExchange, "");
     200                logger.info("RemoteObject: " + UID + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable
     201                                + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
     202
     203                /*
     204                 * Consumer
     205                 */
     206
     207                boolean autoAck = false;
     208
     209                int prefetchCount = 1;
     210                channel.basicQos(prefetchCount);
     211
     212                // Declare a new consumer
     213                consumer = new QueueingConsumer(channel);
     214                channel.basicConsume(queue, autoAck, consumer);
     215                channel.basicConsume(multiQueue, autoAck, consumer);
     216        }
     217
     218        public void kill() throws IOException {
     219                logger.info("Killing objectmq: " + UID + " thread id");
     220                killed = true;
     221                interrupt();
     222                channel.close();
    103223        }
    104224
     
    111231        }
    112232
    113         public BlockingQueue<Delivery> getDeliveryQueue() {
    114                 return deliveryQueue;
    115         }
    116 
    117         public void setDeliveryQueue(BlockingQueue<Delivery> deliveryQueue) {
    118                 this.deliveryQueue = deliveryQueue;
    119         }
    120233}
Note: See TracChangeset for help on using the changeset viewer.