Changeset 108


Ignore:
Timestamp:
10/21/13 15:27:22 (11 years ago)
Author:
stoda
Message:

Refactoring to enable multinvocation thread done.
Now there's a multiinvocation thread which listens to the multiqueue

Location:
branches/supervisor/src/main/java/omq/server
Files:
1 added
3 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/server/InvocationThread.java

    r107 r108  
    11package omq.server;
    22
    3 import java.io.IOException;
    4 import java.lang.reflect.InvocationTargetException;
    5 import java.util.Properties;
    6 
    7 import omq.common.broker.Broker;
    8 import omq.common.message.Request;
    9 import omq.common.message.Response;
    103import omq.common.util.ParameterQueue;
    11 import omq.common.util.Serializer;
    12 import omq.exception.OmqException;
    134import omq.exception.SerializerException;
    145
    156import org.apache.log4j.Logger;
    167
    17 import com.rabbitmq.client.AMQP.BasicProperties;
    18 import com.rabbitmq.client.Channel;
    198import com.rabbitmq.client.ConsumerCancelledException;
    209import com.rabbitmq.client.QueueingConsumer;
     
    2817 *
    2918 */
    30 public class InvocationThread extends Thread {
     19public class InvocationThread extends AInvocationThread {
    3120
    3221        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
    33         private static final String multi = "multi#";
    3422
    3523        // RemoteObject
    36         private RemoteObject obj;
    37         private String reference;
    38         private String UID;
    39         private Properties env;
    4024        private boolean idle;
    4125        private long lastExec;
    4226
    43         private RemoteThreadPool pool;
    44 
    45         // Broker
    46         private Broker broker;
    47         private Serializer serializer;
    48 
    49         // Consumer
    50         private Channel channel;
    51         private QueueingConsumer consumer;
    52         private String multiQueue;
    53         private boolean killed = false;
    54 
    5527        public InvocationThread(RemoteObject obj) throws Exception {
    56                 this.obj = obj;
    57                 this.UID = obj.getUID();
    58                 this.reference = obj.getRef();
    59                 this.env = obj.getEnv();
    60                 this.broker = obj.getBroker();
    61                 this.pool = obj.getPool();
    62                 this.serializer = broker.getSerializer();
     28                super(obj);
    6329                this.lastExec = 0;
    6430                this.idle = true;
    65         }
    66 
    67         @Override
    68         public synchronized void start() {
    69                 try {
    70                         startQueues();
    71                         super.start();
    72                 } catch (Exception e) {
    73                         logger.error("Cannot start a remoteObject", e);
    74                 }
    75 
    7631        }
    7732
     
    8742                                idle = false;
    8843
    89                                 String serializerType = delivery.getProperties().getType();
    90 
    91                                 // Deserialize the request
    92                                 Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
    93                                 String methodName = request.getMethod();
    94                                 String requestID = request.getId();
    95 
    96                                 logger.debug("Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + ", serializerType: " + serializerType);
    97 
    98                                 // Invoke the method
    99                                 Object result = null;
    100                                 OmqException error = null;
    101                                 try {
    102                                         result = obj.invokeMethod(request.getMethod(), request.getParams());
    103                                 } catch (InvocationTargetException e) {
    104                                         Throwable throwable = e.getTargetException();
    105                                         logger.error("Object: " + obj.getRef() + " at method: " + methodName + ", corrID" + requestID, throwable);
    106                                         error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage());
    107                                 } catch (NoSuchMethodException e) {
    108                                         logger.error("Object: " + obj.getRef() + " cannot find method: " + methodName);
    109                                         error = new OmqException(e.getClass().getCanonicalName(), e.getMessage());
    110                                 }
    111 
    112                                 // Reply if it's necessary
    113                                 if (!request.isAsync()) {
    114                                         Response resp = new Response(request.getId(), obj.getRef(), result, error);
    115 
    116                                         BasicProperties props = delivery.getProperties();
    117 
    118                                         BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
    119 
    120                                         byte[] bytesResponse = serializer.serialize(serializerType, resp);
    121                                         channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
    122                                         logger.debug("Publish sync response -> Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + " replyTo: "
    123                                                         + props.getReplyTo());
    124                                 }
    125 
    126                                 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
     44                                executeTask(delivery);
    12745
    12846                                // The thread is now idle
     
    16886         * @throws Exception
    16987         */
    170         private void startQueues() throws Exception {
     88        protected void startQueues() throws Exception {
    17189                // Start channel
    17290                channel = broker.getNewChannel();
     
    190108                        channel.queueBind(queue, exchange, routingKey);
    191109                }
    192                 logger.info("RemoteObject: " + reference + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
    193                                 + exclusive + ", AutoDelete: " + autoDelete);
     110                logger.info("RemoteObject: " + reference + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable
     111                                + ", Exclusive: " + exclusive + ", AutoDelete: " + autoDelete);
    194112
    195113                /*
     
    207125                                channel.queueBind(UID, exchange, UID);
    208126                        }
     127                        // TODO logger queue
     128                        // TODO UID queue should be reference + UID
    209129                }
    210 
    211                 /*
    212                  * Multi queue, exclusive per each instance
    213                  */
    214 
    215                 // Get info about the multiQueue
    216                 String multiExchange = multi + reference;
    217                 // TODO:String multiExchange = multi + exchange + reference;
    218                 multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
    219 
    220                 // Multi queue (exclusive queue per remoteObject)
    221                 boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
    222                 boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
    223                 boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
    224 
    225                 // Declares and bindings
    226                 channel.exchangeDeclare(multiExchange, "fanout");
    227                 if (multiQueue == null) {
    228                         multiQueue = channel.queueDeclare().getQueue();
    229                 } else {
    230                         channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
    231                 }
    232                 channel.queueBind(multiQueue, multiExchange, "");
    233                 logger.info("RemoteObject: " + reference + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable
    234                                 + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
    235130
    236131                /*
     
    238133                 */
    239134
    240                 // Disable Round Robin behavior 
     135                // Disable Round Robin behavior
    241136                boolean autoAck = false;
    242137
     
    247142                consumer = new QueueingConsumer(channel);
    248143                channel.basicConsume(queue, autoAck, consumer);
    249                 channel.basicConsume(multiQueue, autoAck, consumer);
    250144                if (UID != null) {
    251145                        channel.basicConsume(UID, autoAck, consumer);
    252146                }
    253         }
    254 
    255         public void kill() throws IOException {
    256                 logger.info("Killing objectmq: " + reference + " thread id");
    257                 killed = true;
    258                 interrupt();
    259                 channel.close();
    260         }
    261 
    262         public RemoteObject getObj() {
    263                 return obj;
    264         }
    265 
    266         public void setObj(RemoteObject obj) {
    267                 this.obj = obj;
    268147        }
    269148
  • branches/supervisor/src/main/java/omq/server/MultiInvocationThread.java

    r107 r108  
    11package omq.server;
    22
    3 public class MultiInvocationThread {
     3import omq.common.util.ParameterQueue;
     4import omq.exception.SerializerException;
     5
     6import org.apache.log4j.Logger;
     7
     8import com.rabbitmq.client.ConsumerCancelledException;
     9import com.rabbitmq.client.QueueingConsumer;
     10import com.rabbitmq.client.QueueingConsumer.Delivery;
     11import com.rabbitmq.client.ShutdownSignalException;
     12
     13public class MultiInvocationThread extends AInvocationThread {
     14
     15        private static final Logger logger = Logger.getLogger(MultiInvocationThread.class.getName());
     16        private static final String multi = "multi#";
     17
     18        // Consumer
     19        private String multiQueue;
     20
     21        public MultiInvocationThread(RemoteObject obj) throws Exception {
     22                super(obj);
     23        }
     24
     25        @Override
     26        public void run() {
     27                while (!killed) {
     28                        try {
     29                                // Get the delivery
     30                                Delivery delivery = consumer.nextDelivery();
     31                                // This thread does not need to set busy because it's mandatory
     32                                // to exist
     33                                executeTask(delivery);
     34                        } catch (InterruptedException i) {
     35                                logger.error(i);
     36                        } catch (ShutdownSignalException e) {
     37                                logger.error(e);
     38                                try {
     39                                        if (channel.isOpen()) {
     40                                                channel.close();
     41                                        }
     42                                        startQueues();
     43                                } catch (Exception e1) {
     44                                        try {
     45                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
     46                                                Thread.sleep(milis);
     47                                        } catch (InterruptedException e2) {
     48                                                logger.error(e2);
     49                                        }
     50                                        logger.error(e1);
     51                                }
     52                        } catch (ConsumerCancelledException e) {
     53                                logger.error(e);
     54                        } catch (SerializerException e) {
     55                                logger.error(e);
     56                        } catch (Exception e) {
     57                                e.printStackTrace();
     58                                logger.error(e);
     59                        }
     60
     61                }
     62                logger.info("ObjectMQ ('" + obj.getRef() + "') InvocationThread " + Thread.currentThread().getId() + " is killed");
     63        }
     64
     65        @Override
     66        protected void startQueues() throws Exception {
     67                // Start channel
     68                channel = broker.getNewChannel();
     69
     70                /*
     71                 * Multi queue, exclusive per each instance
     72                 */
     73
     74                // Get info about the multiQueue
     75                String multiExchange = multi + reference;
     76                // TODO:String multiExchange = multi + exchange + reference;
     77                multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
     78
     79                // Multi queue (exclusive queue per remoteObject)
     80                boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
     81                boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
     82                boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
     83
     84                // Declares and bindings
     85                channel.exchangeDeclare(multiExchange, "fanout");
     86                if (multiQueue == null) {
     87                        multiQueue = channel.queueDeclare().getQueue();
     88                } else {
     89                        channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
     90                }
     91                channel.queueBind(multiQueue, multiExchange, "");
     92                logger.info("RemoteObject: " + reference + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: "
     93                                + multiDurable + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
     94
     95                /*
     96                 * Consumer
     97                 */
     98
     99                // Disable Round Robin behavior
     100                boolean autoAck = false;
     101
     102                // Declare a new consumer
     103                consumer = new QueueingConsumer(channel);
     104                channel.basicConsume(multiQueue, autoAck, consumer);
     105        }
    4106
    5107}
  • branches/supervisor/src/main/java/omq/server/RemoteThreadPool.java

    r102 r108  
    2323        private static final Logger logger = Logger.getLogger(RemoteThreadPool.class.getName());
    2424        private List<InvocationThread> workers;
     25        private MultiInvocationThread multiWorker;
    2526        private AtomicInteger busy;
    2627        private int minPoolThreads;
     
    3435        private boolean killed = false;
    3536
    36         public RemoteThreadPool(int minPoolThreads, int maxPoolThreads, long refresh, long keepAliveTime, int maxMessagesPerThread, RemoteObject obj, Broker broker) {
     37        public RemoteThreadPool(int minPoolThreads, int maxPoolThreads, long refresh, long keepAliveTime, int maxMessagesPerThread,
     38                        RemoteObject obj, Broker broker) {
    3739                this.minPoolThreads = minPoolThreads;
    3840                this.maxPoolThreads = maxPoolThreads;
     
    5355                 * Create and start minPoolThreads
    5456                 */
    55                 logger.info("ObjectMQ reference: " + obj.getRef() + ", creating: " + minPoolThreads + ", maxPoolThreads: " + maxPoolThreads + ", refresh time: "
    56                                 + refresh + ", keepAlive: " + keepAliveTime + ", maxMessagesPerThread: " + maxMessagesPerThread);
     57                logger.info("ObjectMQ reference: " + obj.getRef() + ", creating: " + minPoolThreads + ", maxPoolThreads: " + maxPoolThreads
     58                                + ", refresh time: " + refresh + ", keepAlive: " + keepAliveTime + ", maxMessagesPerThread: " + maxMessagesPerThread);
     59
     60                try {
     61                        multiWorker = new MultiInvocationThread(obj);
     62                        multiWorker.start();
     63                } catch (Exception e1) {
     64                        // TODO Auto-generated catch block
     65                        e1.printStackTrace();
     66                }
    5767
    5868                for (int i = 0; i < minPoolThreads; i++) {
     
    8696                                } else if (numWorkers > minPoolThreads && busy.get() < numWorkers) {
    8797                                        // Kill idle threads
    88                                         System.out.println("Kill lazy workers, numWorkers = " + numWorkers + ", minPool = " + minPoolThreads + ", busy = " + busy.get());
     98                                        System.out.println("Kill lazy workers, numWorkers = " + numWorkers + ", minPool = " + minPoolThreads + ", busy = "
     99                                                        + busy.get());
    89100                                        stopIdleThreads();
    90101                                }
Note: See TracChangeset for help on using the changeset viewer.