Ignore:
Timestamp:
10/02/13 17:31:26 (11 years ago)
Author:
stoda
Message:

ObjectMQ without RemoteWrapper? - one consumer per thread -

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/server/RemoteObject.java

    r92 r96  
    1212import omq.common.broker.Broker;
    1313import omq.common.util.ParameterQueue;
    14 import omq.exception.SerializerException;
    1514
    1615import org.apache.log4j.Logger;
    17 
    18 import com.rabbitmq.client.Channel;
    19 import com.rabbitmq.client.ConsumerCancelledException;
    20 import com.rabbitmq.client.QueueingConsumer;
    21 import com.rabbitmq.client.QueueingConsumer.Delivery;
    22 import com.rabbitmq.client.ShutdownSignalException;
    2316
    2417/**
     
    3225 *
    3326 */
    34 public abstract class RemoteObject extends Thread implements Remote {
     27public abstract class RemoteObject implements Remote {
    3528
    3629        private static final long serialVersionUID = -1778953938739846450L;
    37         private static final String multi = "multi#";
    3830        private static final Logger logger = Logger.getLogger(RemoteObject.class.getName());
    3931
     
    4133        private Properties env;
    4234        private transient Broker broker;
    43         private transient String multiQueue;
    44         private transient RemoteWrapper remoteWrapper;
    4535        private transient Map<String, List<Class<?>>> params;
    46         private transient Channel channel;
    47         private transient QueueingConsumer consumer;
    48         private transient boolean killed = false;
     36        private transient List<InvocationThread> invocationList;
    4937
    5038        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
     
    6048        }
    6149
    62         public RemoteObject() {
    63         }
    64 
    6550        /**
    6651         * This method starts a remoteObject.
     
    9075                // Get num threads to use
    9176                int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
    92                 this.remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer());
    93 
    94                 startQueues();
    95 
    96                 // Start this listener
    97                 this.start();
    98         }
    99 
    100         @Override
    101         public void run() {
    102                 while (!killed) {
    103                         try {
    104                                 Delivery delivery = consumer.nextDelivery();
    105 
    106                                 logger.debug(UID + " has received a message, serializer: " + delivery.getProperties().getType());
    107 
    108                                 remoteWrapper.notifyDelivery(delivery);
    109                         } catch (InterruptedException i) {
    110                                 logger.error(i);
    111                         } catch (ShutdownSignalException e) {
    112                                 logger.error(e);
    113                                 try {
    114                                         if (channel.isOpen()) {
    115                                                 channel.close();
    116                                         }
    117                                         startQueues();
    118                                 } catch (Exception e1) {
    119                                         try {
    120                                                 long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
    121                                                 Thread.sleep(milis);
    122                                         } catch (InterruptedException e2) {
    123                                                 logger.error(e2);
    124                                         }
    125                                         logger.error(e1);
    126                                 }
    127                         } catch (ConsumerCancelledException e) {
    128                                 logger.error(e);
    129                         } catch (SerializerException e) {
    130                                 logger.error(e);
    131                         } catch (Exception e) {
    132                                 logger.error(e);
    133                         }
    134                 }
     77                invocationList = new ArrayList<InvocationThread>(numThreads);
     78
     79                // Start invocation threads
     80                for (int i = 0; i < numThreads; i++) {
     81                        InvocationThread iThread = new InvocationThread(this, broker);
     82                        invocationList.add(iThread);
     83                        iThread.start();
     84                }
     85
    13586        }
    13687
     
    14798         */
    14899        public void kill() throws IOException {
    149                 logger.warn("Killing objectmq: " + this.getRef());
    150                 killed = true;
    151                 interrupt();
    152                 channel.close();
    153                 remoteWrapper.stopRemoteWrapper();
     100                logger.info("Killing objectmq: " + this.getRef());
     101                for (InvocationThread iThread : invocationList) {
     102                        iThread.kill();
     103                }
    154104        }
    155105
     
    250200        }
    251201
    252         public Channel getChannel() {
    253                 return channel;
    254         }
    255 
    256202        public Broker getBroker() {
    257203                return broker;
    258204        }
    259205
    260         /**
    261          * This method starts the queues using the information got in the
    262          * environment.
    263          *
    264          * @throws Exception
    265          */
    266         private void startQueues() throws Exception {
    267                 // Start channel
    268                 channel = broker.getNewChannel();
    269 
    270                 /*
    271                  * Default queue, Round Robin behaviour
    272                  */
    273 
    274                 // Get info about which exchange and queue will use
    275                 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
    276                 String queue = UID;
    277                 String routingKey = UID;
    278 
    279                 // RemoteObject default queue
    280                 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
    281                 boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
    282                 boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
    283 
    284                 // Declares and bindings
    285                 if (!exchange.equalsIgnoreCase("")) { // Default exchange case
    286                         channel.exchangeDeclare(exchange, "direct");
    287                 }
    288                 channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
    289                 if (!exchange.equalsIgnoreCase("")) { // Default exchange case
    290                         channel.queueBind(queue, exchange, routingKey);
    291                 }
    292                 logger.info("RemoteObject: " + UID + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
    293                                 + exclusive + ", AutoDelete: " + autoDelete);
    294 
    295                 /*
    296                  * Multi queue, exclusive per each instance
    297                  */
    298 
    299                 // Get info about the multiQueue
    300                 String multiExchange = multi + UID;
    301                 multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
    302 
    303                 // Multi queue (exclusive queue per remoteObject)
    304                 boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
    305                 boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
    306                 boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
    307 
    308                 // Declares and bindings
    309                 channel.exchangeDeclare(multiExchange, "fanout");
    310                 if (multiQueue == null) {
    311                         multiQueue = channel.queueDeclare().getQueue();
    312                 } else {
    313                         channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
    314                 }
    315                 channel.queueBind(multiQueue, multiExchange, "");
    316                 logger.info("RemoteObject: " + UID + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable
    317                                 + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
    318 
    319                 /*
    320                  * Consumer
    321                  */
    322 
    323                 boolean autoAck = false;
    324 
    325                 //TODO see if this is useless
    326                 int prefetchCount = 1;
    327                 channel.basicQos(prefetchCount);
    328 
    329                 // Declare a new consumer
    330                 consumer = new QueueingConsumer(channel);
    331                 channel.basicConsume(queue, autoAck, consumer);
    332                 channel.basicConsume(multiQueue, autoAck, consumer);
     206        public Properties getEnv() {
     207                return env;
    333208        }
    334209
Note: See TracChangeset for help on using the changeset viewer.