Changeset 96


Ignore:
Timestamp:
10/02/13 17:31:26 (11 years ago)
Author:
stoda
Message:

ObjectMQ without RemoteWrapper? - one consumer per thread -

Location:
branches/supervisor/src
Files:
4 added
1 deleted
4 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/server/InvocationThread.java

    r91 r96  
    11package omq.server;
    22
     3import java.io.IOException;
    34import java.lang.reflect.InvocationTargetException;
    4 import java.util.concurrent.BlockingQueue;
    5 
     5import java.util.Properties;
     6
     7import omq.common.broker.Broker;
    68import omq.common.message.Request;
    79import omq.common.message.Response;
     10import omq.common.util.ParameterQueue;
    811import omq.common.util.Serializer;
    912import omq.exception.OmqException;
     13import omq.exception.SerializerException;
    1014
    1115import org.apache.log4j.Logger;
     
    1317import com.rabbitmq.client.AMQP.BasicProperties;
    1418import com.rabbitmq.client.Channel;
     19import com.rabbitmq.client.ConsumerCancelledException;
     20import com.rabbitmq.client.QueueingConsumer;
    1521import com.rabbitmq.client.QueueingConsumer.Delivery;
     22import com.rabbitmq.client.ShutdownSignalException;
    1623
    1724/**
     
    2229 */
    2330public class InvocationThread extends Thread {
     31
    2432        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
     33        private static final String multi = "multi#";
     34
     35        // RemoteObject
    2536        private RemoteObject obj;
     37        private String UID;
     38        private Properties env;
     39
     40        // Broker
     41        private Broker broker;
    2642        private Serializer serializer;
    27         // private RemoteWrapper wrapper;
    28         private BlockingQueue<Delivery> deliveryQueue;
     43
     44        // Consumer
     45        private Channel channel;
     46        private QueueingConsumer consumer;
     47        private String multiQueue;
    2948        private boolean killed = false;
    3049
    31         public InvocationThread(RemoteObject obj, RemoteWrapper wrapper, Serializer serializer) {
     50        public InvocationThread(RemoteObject obj, Broker broker) throws Exception {
    3251                this.obj = obj;
    33                 // this.wrapper = wrapper;
    34                 this.deliveryQueue = wrapper.getDeliveryQueue();
    35                 this.serializer = serializer;
     52                this.UID = obj.getRef();
     53                this.env = obj.getEnv();
     54                this.broker = broker;
     55                this.serializer = broker.getSerializer();
     56        }
     57
     58        @Override
     59        public synchronized void start() {
     60                try {
     61                        startQueues();
     62                        super.start();
     63                } catch (Exception e) {
     64                        logger.error("Cannot start a remoteObject", e);
     65                }
     66
    3667        }
    3768
     
    4172                        try {
    4273                                // Get the delivery
    43                                 Delivery delivery = deliveryQueue.take();
    44 
    45                                 // // Indicate this thread is not available
    46                                 // wrapper.increaseBusy();
     74                                Delivery delivery = consumer.nextDelivery();
    4775
    4876                                String serializerType = delivery.getProperties().getType();
     
    6997                                }
    7098
    71                                
    72                                 Channel channel = obj.getChannel();
    73                                
    74                                
    7599                                // Reply if it's necessary
    76100                                if (!request.isAsync()) {
    77101                                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
    78 
    79                                        
    80102
    81103                                        BasicProperties props = delivery.getProperties();
     
    88110                                                        + props.getReplyTo());
    89111                                }
    90                                
     112
    91113                                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    92                                
    93                                 // // Indicate this thread is available
    94                                 // wrapper.decreaseBusy();
    95114                        } catch (InterruptedException i) {
    96115                                logger.error(i);
    97                                 killed = true;
     116                        } catch (ShutdownSignalException e) {
     117                                logger.error(e);
     118                                try {
     119                                        if (channel.isOpen()) {
     120                                                channel.close();
     121                                        }
     122                                        startQueues();
     123                                } catch (Exception e1) {
     124                                        try {
     125                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
     126                                                Thread.sleep(milis);
     127                                        } catch (InterruptedException e2) {
     128                                                logger.error(e2);
     129                                        }
     130                                        logger.error(e1);
     131                                }
     132                        } catch (ConsumerCancelledException e) {
     133                                logger.error(e);
     134                        } catch (SerializerException e) {
     135                                logger.error(e);
    98136                        } catch (Exception e) {
    99                                 logger.error("Object: " + obj.getRef(), e);
     137                                logger.error(e);
    100138                        }
    101139
    102140                }
     141        }
     142
     143        /**
     144         * This method starts the queues using the information got in the
     145         * environment.
     146         *
     147         * @throws Exception
     148         */
     149        private void startQueues() throws Exception {
     150                // Start channel
     151                channel = broker.getNewChannel();
     152
     153                /*
     154                 * Default queue, Round Robin behaviour
     155                 */
     156
     157                // Get info about which exchange and queue will use
     158                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
     159                String queue = UID;
     160                String routingKey = UID;
     161
     162                // RemoteObject default queue
     163                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
     164                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
     165                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
     166
     167                // Declares and bindings
     168                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
     169                        channel.exchangeDeclare(exchange, "direct");
     170                }
     171                channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
     172                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
     173                        channel.queueBind(queue, exchange, routingKey);
     174                }
     175                logger.info("RemoteObject: " + UID + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
     176                                + exclusive + ", AutoDelete: " + autoDelete);
     177
     178                /*
     179                 * Multi queue, exclusive per each instance
     180                 */
     181
     182                // Get info about the multiQueue
     183                String multiExchange = multi + UID;
     184                // TODO:String multiExchange = multi + exchange + UID;
     185                multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
     186
     187                // Multi queue (exclusive queue per remoteObject)
     188                boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
     189                boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
     190                boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
     191
     192                // Declares and bindings
     193                channel.exchangeDeclare(multiExchange, "fanout");
     194                if (multiQueue == null) {
     195                        multiQueue = channel.queueDeclare().getQueue();
     196                } else {
     197                        channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
     198                }
     199                channel.queueBind(multiQueue, multiExchange, "");
     200                logger.info("RemoteObject: " + UID + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable
     201                                + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
     202
     203                /*
     204                 * Consumer
     205                 */
     206
     207                boolean autoAck = false;
     208
     209                int prefetchCount = 1;
     210                channel.basicQos(prefetchCount);
     211
     212                // Declare a new consumer
     213                consumer = new QueueingConsumer(channel);
     214                channel.basicConsume(queue, autoAck, consumer);
     215                channel.basicConsume(multiQueue, autoAck, consumer);
     216        }
     217
     218        public void kill() throws IOException {
     219                logger.info("Killing objectmq: " + UID + " thread id");
     220                killed = true;
     221                interrupt();
     222                channel.close();
    103223        }
    104224
     
    111231        }
    112232
    113         public BlockingQueue<Delivery> getDeliveryQueue() {
    114                 return deliveryQueue;
    115         }
    116 
    117         public void setDeliveryQueue(BlockingQueue<Delivery> deliveryQueue) {
    118                 this.deliveryQueue = deliveryQueue;
    119         }
    120233}
  • branches/supervisor/src/main/java/omq/server/RemoteObject.java

    r92 r96  
    1212import omq.common.broker.Broker;
    1313import omq.common.util.ParameterQueue;
    14 import omq.exception.SerializerException;
    1514
    1615import org.apache.log4j.Logger;
    17 
    18 import com.rabbitmq.client.Channel;
    19 import com.rabbitmq.client.ConsumerCancelledException;
    20 import com.rabbitmq.client.QueueingConsumer;
    21 import com.rabbitmq.client.QueueingConsumer.Delivery;
    22 import com.rabbitmq.client.ShutdownSignalException;
    2316
    2417/**
     
    3225 *
    3326 */
    34 public abstract class RemoteObject extends Thread implements Remote {
     27public abstract class RemoteObject implements Remote {
    3528
    3629        private static final long serialVersionUID = -1778953938739846450L;
    37         private static final String multi = "multi#";
    3830        private static final Logger logger = Logger.getLogger(RemoteObject.class.getName());
    3931
     
    4133        private Properties env;
    4234        private transient Broker broker;
    43         private transient String multiQueue;
    44         private transient RemoteWrapper remoteWrapper;
    4535        private transient Map<String, List<Class<?>>> params;
    46         private transient Channel channel;
    47         private transient QueueingConsumer consumer;
    48         private transient boolean killed = false;
     36        private transient List<InvocationThread> invocationList;
    4937
    5038        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
     
    6048        }
    6149
    62         public RemoteObject() {
    63         }
    64 
    6550        /**
    6651         * This method starts a remoteObject.
     
    9075                // Get num threads to use
    9176                int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
    92                 this.remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer());
    93 
    94                 startQueues();
    95 
    96                 // Start this listener
    97                 this.start();
    98         }
    99 
    100         @Override
    101         public void run() {
    102                 while (!killed) {
    103                         try {
    104                                 Delivery delivery = consumer.nextDelivery();
    105 
    106                                 logger.debug(UID + " has received a message, serializer: " + delivery.getProperties().getType());
    107 
    108                                 remoteWrapper.notifyDelivery(delivery);
    109                         } catch (InterruptedException i) {
    110                                 logger.error(i);
    111                         } catch (ShutdownSignalException e) {
    112                                 logger.error(e);
    113                                 try {
    114                                         if (channel.isOpen()) {
    115                                                 channel.close();
    116                                         }
    117                                         startQueues();
    118                                 } catch (Exception e1) {
    119                                         try {
    120                                                 long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
    121                                                 Thread.sleep(milis);
    122                                         } catch (InterruptedException e2) {
    123                                                 logger.error(e2);
    124                                         }
    125                                         logger.error(e1);
    126                                 }
    127                         } catch (ConsumerCancelledException e) {
    128                                 logger.error(e);
    129                         } catch (SerializerException e) {
    130                                 logger.error(e);
    131                         } catch (Exception e) {
    132                                 logger.error(e);
    133                         }
    134                 }
     77                invocationList = new ArrayList<InvocationThread>(numThreads);
     78
     79                // Start invocation threads
     80                for (int i = 0; i < numThreads; i++) {
     81                        InvocationThread iThread = new InvocationThread(this, broker);
     82                        invocationList.add(iThread);
     83                        iThread.start();
     84                }
     85
    13586        }
    13687
     
    14798         */
    14899        public void kill() throws IOException {
    149                 logger.warn("Killing objectmq: " + this.getRef());
    150                 killed = true;
    151                 interrupt();
    152                 channel.close();
    153                 remoteWrapper.stopRemoteWrapper();
     100                logger.info("Killing objectmq: " + this.getRef());
     101                for (InvocationThread iThread : invocationList) {
     102                        iThread.kill();
     103                }
    154104        }
    155105
     
    250200        }
    251201
    252         public Channel getChannel() {
    253                 return channel;
    254         }
    255 
    256202        public Broker getBroker() {
    257203                return broker;
    258204        }
    259205
    260         /**
    261          * This method starts the queues using the information got in the
    262          * environment.
    263          *
    264          * @throws Exception
    265          */
    266         private void startQueues() throws Exception {
    267                 // Start channel
    268                 channel = broker.getNewChannel();
    269 
    270                 /*
    271                  * Default queue, Round Robin behaviour
    272                  */
    273 
    274                 // Get info about which exchange and queue will use
    275                 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
    276                 String queue = UID;
    277                 String routingKey = UID;
    278 
    279                 // RemoteObject default queue
    280                 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
    281                 boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
    282                 boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
    283 
    284                 // Declares and bindings
    285                 if (!exchange.equalsIgnoreCase("")) { // Default exchange case
    286                         channel.exchangeDeclare(exchange, "direct");
    287                 }
    288                 channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
    289                 if (!exchange.equalsIgnoreCase("")) { // Default exchange case
    290                         channel.queueBind(queue, exchange, routingKey);
    291                 }
    292                 logger.info("RemoteObject: " + UID + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
    293                                 + exclusive + ", AutoDelete: " + autoDelete);
    294 
    295                 /*
    296                  * Multi queue, exclusive per each instance
    297                  */
    298 
    299                 // Get info about the multiQueue
    300                 String multiExchange = multi + UID;
    301                 multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
    302 
    303                 // Multi queue (exclusive queue per remoteObject)
    304                 boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
    305                 boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
    306                 boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
    307 
    308                 // Declares and bindings
    309                 channel.exchangeDeclare(multiExchange, "fanout");
    310                 if (multiQueue == null) {
    311                         multiQueue = channel.queueDeclare().getQueue();
    312                 } else {
    313                         channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
    314                 }
    315                 channel.queueBind(multiQueue, multiExchange, "");
    316                 logger.info("RemoteObject: " + UID + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable
    317                                 + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
    318 
    319                 /*
    320                  * Consumer
    321                  */
    322 
    323                 boolean autoAck = false;
    324 
    325                 //TODO see if this is useless
    326                 int prefetchCount = 1;
    327                 channel.basicQos(prefetchCount);
    328 
    329                 // Declare a new consumer
    330                 consumer = new QueueingConsumer(channel);
    331                 channel.basicConsume(queue, autoAck, consumer);
    332                 channel.basicConsume(multiQueue, autoAck, consumer);
     206        public Properties getEnv() {
     207                return env;
    333208        }
    334209
  • branches/supervisor/src/test/java/omq/test/supervisor/SleepImpl.java

    r95 r96  
    1616                try {
    1717                        System.out.println("I'm going to sleep!!!!!!!!" + Thread.currentThread().getId());
    18                         Thread.sleep(1000);
     18                        Thread.sleep(2000);
    1919                } catch (InterruptedException e) {
    2020                        e.printStackTrace();
  • branches/supervisor/src/test/java/omq/test/supervisor/SleepTest.java

    r95 r96  
    2424                env1.setProperty(ParameterQueue.RABBIT_HOST, "127.0.0.1");
    2525                env1.setProperty(ParameterQueue.RABBIT_PORT, "5672");
    26                 env1.setProperty(ParameterQueue.NUM_THREADS, "1");
     26                env1.setProperty(ParameterQueue.NUM_THREADS, "4");
    2727
    2828                Broker broker = new Broker(env1);
Note: See TracChangeset for help on using the changeset viewer.