Ignore:
Timestamp:
10/21/13 15:27:22 (11 years ago)
Author:
stoda
Message:

Refactoring to enable multinvocation thread done.
Now there's a multiinvocation thread which listens to the multiqueue

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/server/MultiInvocationThread.java

    r107 r108  
    11package omq.server;
    22
    3 public class MultiInvocationThread {
     3import omq.common.util.ParameterQueue;
     4import omq.exception.SerializerException;
     5
     6import org.apache.log4j.Logger;
     7
     8import com.rabbitmq.client.ConsumerCancelledException;
     9import com.rabbitmq.client.QueueingConsumer;
     10import com.rabbitmq.client.QueueingConsumer.Delivery;
     11import com.rabbitmq.client.ShutdownSignalException;
     12
     13public class MultiInvocationThread extends AInvocationThread {
     14
     15        private static final Logger logger = Logger.getLogger(MultiInvocationThread.class.getName());
     16        private static final String multi = "multi#";
     17
     18        // Consumer
     19        private String multiQueue;
     20
     21        public MultiInvocationThread(RemoteObject obj) throws Exception {
     22                super(obj);
     23        }
     24
     25        @Override
     26        public void run() {
     27                while (!killed) {
     28                        try {
     29                                // Get the delivery
     30                                Delivery delivery = consumer.nextDelivery();
     31                                // This thread does not need to set busy because it's mandatory
     32                                // to exist
     33                                executeTask(delivery);
     34                        } catch (InterruptedException i) {
     35                                logger.error(i);
     36                        } catch (ShutdownSignalException e) {
     37                                logger.error(e);
     38                                try {
     39                                        if (channel.isOpen()) {
     40                                                channel.close();
     41                                        }
     42                                        startQueues();
     43                                } catch (Exception e1) {
     44                                        try {
     45                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
     46                                                Thread.sleep(milis);
     47                                        } catch (InterruptedException e2) {
     48                                                logger.error(e2);
     49                                        }
     50                                        logger.error(e1);
     51                                }
     52                        } catch (ConsumerCancelledException e) {
     53                                logger.error(e);
     54                        } catch (SerializerException e) {
     55                                logger.error(e);
     56                        } catch (Exception e) {
     57                                e.printStackTrace();
     58                                logger.error(e);
     59                        }
     60
     61                }
     62                logger.info("ObjectMQ ('" + obj.getRef() + "') InvocationThread " + Thread.currentThread().getId() + " is killed");
     63        }
     64
     65        @Override
     66        protected void startQueues() throws Exception {
     67                // Start channel
     68                channel = broker.getNewChannel();
     69
     70                /*
     71                 * Multi queue, exclusive per each instance
     72                 */
     73
     74                // Get info about the multiQueue
     75                String multiExchange = multi + reference;
     76                // TODO:String multiExchange = multi + exchange + reference;
     77                multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
     78
     79                // Multi queue (exclusive queue per remoteObject)
     80                boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
     81                boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
     82                boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
     83
     84                // Declares and bindings
     85                channel.exchangeDeclare(multiExchange, "fanout");
     86                if (multiQueue == null) {
     87                        multiQueue = channel.queueDeclare().getQueue();
     88                } else {
     89                        channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
     90                }
     91                channel.queueBind(multiQueue, multiExchange, "");
     92                logger.info("RemoteObject: " + reference + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: "
     93                                + multiDurable + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
     94
     95                /*
     96                 * Consumer
     97                 */
     98
     99                // Disable Round Robin behavior
     100                boolean autoAck = false;
     101
     102                // Declare a new consumer
     103                consumer = new QueueingConsumer(channel);
     104                channel.basicConsume(multiQueue, autoAck, consumer);
     105        }
    4106
    5107}
Note: See TracChangeset for help on using the changeset viewer.