Ignore:
Timestamp:
10/21/13 15:27:22 (11 years ago)
Author:
stoda
Message:

Refactoring to enable multinvocation thread done.
Now there's a multiinvocation thread which listens to the multiqueue

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/server/InvocationThread.java

    r107 r108  
    11package omq.server;
    22
    3 import java.io.IOException;
    4 import java.lang.reflect.InvocationTargetException;
    5 import java.util.Properties;
    6 
    7 import omq.common.broker.Broker;
    8 import omq.common.message.Request;
    9 import omq.common.message.Response;
    103import omq.common.util.ParameterQueue;
    11 import omq.common.util.Serializer;
    12 import omq.exception.OmqException;
    134import omq.exception.SerializerException;
    145
    156import org.apache.log4j.Logger;
    167
    17 import com.rabbitmq.client.AMQP.BasicProperties;
    18 import com.rabbitmq.client.Channel;
    198import com.rabbitmq.client.ConsumerCancelledException;
    209import com.rabbitmq.client.QueueingConsumer;
     
    2817 *
    2918 */
    30 public class InvocationThread extends Thread {
     19public class InvocationThread extends AInvocationThread {
    3120
    3221        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
    33         private static final String multi = "multi#";
    3422
    3523        // RemoteObject
    36         private RemoteObject obj;
    37         private String reference;
    38         private String UID;
    39         private Properties env;
    4024        private boolean idle;
    4125        private long lastExec;
    4226
    43         private RemoteThreadPool pool;
    44 
    45         // Broker
    46         private Broker broker;
    47         private Serializer serializer;
    48 
    49         // Consumer
    50         private Channel channel;
    51         private QueueingConsumer consumer;
    52         private String multiQueue;
    53         private boolean killed = false;
    54 
    5527        public InvocationThread(RemoteObject obj) throws Exception {
    56                 this.obj = obj;
    57                 this.UID = obj.getUID();
    58                 this.reference = obj.getRef();
    59                 this.env = obj.getEnv();
    60                 this.broker = obj.getBroker();
    61                 this.pool = obj.getPool();
    62                 this.serializer = broker.getSerializer();
     28                super(obj);
    6329                this.lastExec = 0;
    6430                this.idle = true;
    65         }
    66 
    67         @Override
    68         public synchronized void start() {
    69                 try {
    70                         startQueues();
    71                         super.start();
    72                 } catch (Exception e) {
    73                         logger.error("Cannot start a remoteObject", e);
    74                 }
    75 
    7631        }
    7732
     
    8742                                idle = false;
    8843
    89                                 String serializerType = delivery.getProperties().getType();
    90 
    91                                 // Deserialize the request
    92                                 Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
    93                                 String methodName = request.getMethod();
    94                                 String requestID = request.getId();
    95 
    96                                 logger.debug("Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + ", serializerType: " + serializerType);
    97 
    98                                 // Invoke the method
    99                                 Object result = null;
    100                                 OmqException error = null;
    101                                 try {
    102                                         result = obj.invokeMethod(request.getMethod(), request.getParams());
    103                                 } catch (InvocationTargetException e) {
    104                                         Throwable throwable = e.getTargetException();
    105                                         logger.error("Object: " + obj.getRef() + " at method: " + methodName + ", corrID" + requestID, throwable);
    106                                         error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage());
    107                                 } catch (NoSuchMethodException e) {
    108                                         logger.error("Object: " + obj.getRef() + " cannot find method: " + methodName);
    109                                         error = new OmqException(e.getClass().getCanonicalName(), e.getMessage());
    110                                 }
    111 
    112                                 // Reply if it's necessary
    113                                 if (!request.isAsync()) {
    114                                         Response resp = new Response(request.getId(), obj.getRef(), result, error);
    115 
    116                                         BasicProperties props = delivery.getProperties();
    117 
    118                                         BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
    119 
    120                                         byte[] bytesResponse = serializer.serialize(serializerType, resp);
    121                                         channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
    122                                         logger.debug("Publish sync response -> Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + " replyTo: "
    123                                                         + props.getReplyTo());
    124                                 }
    125 
    126                                 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
     44                                executeTask(delivery);
    12745
    12846                                // The thread is now idle
     
    16886         * @throws Exception
    16987         */
    170         private void startQueues() throws Exception {
     88        protected void startQueues() throws Exception {
    17189                // Start channel
    17290                channel = broker.getNewChannel();
     
    190108                        channel.queueBind(queue, exchange, routingKey);
    191109                }
    192                 logger.info("RemoteObject: " + reference + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
    193                                 + exclusive + ", AutoDelete: " + autoDelete);
     110                logger.info("RemoteObject: " + reference + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable
     111                                + ", Exclusive: " + exclusive + ", AutoDelete: " + autoDelete);
    194112
    195113                /*
     
    207125                                channel.queueBind(UID, exchange, UID);
    208126                        }
     127                        // TODO logger queue
     128                        // TODO UID queue should be reference + UID
    209129                }
    210 
    211                 /*
    212                  * Multi queue, exclusive per each instance
    213                  */
    214 
    215                 // Get info about the multiQueue
    216                 String multiExchange = multi + reference;
    217                 // TODO:String multiExchange = multi + exchange + reference;
    218                 multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
    219 
    220                 // Multi queue (exclusive queue per remoteObject)
    221                 boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
    222                 boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
    223                 boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
    224 
    225                 // Declares and bindings
    226                 channel.exchangeDeclare(multiExchange, "fanout");
    227                 if (multiQueue == null) {
    228                         multiQueue = channel.queueDeclare().getQueue();
    229                 } else {
    230                         channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
    231                 }
    232                 channel.queueBind(multiQueue, multiExchange, "");
    233                 logger.info("RemoteObject: " + reference + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable
    234                                 + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
    235130
    236131                /*
     
    238133                 */
    239134
    240                 // Disable Round Robin behavior 
     135                // Disable Round Robin behavior
    241136                boolean autoAck = false;
    242137
     
    247142                consumer = new QueueingConsumer(channel);
    248143                channel.basicConsume(queue, autoAck, consumer);
    249                 channel.basicConsume(multiQueue, autoAck, consumer);
    250144                if (UID != null) {
    251145                        channel.basicConsume(UID, autoAck, consumer);
    252146                }
    253         }
    254 
    255         public void kill() throws IOException {
    256                 logger.info("Killing objectmq: " + reference + " thread id");
    257                 killed = true;
    258                 interrupt();
    259                 channel.close();
    260         }
    261 
    262         public RemoteObject getObj() {
    263                 return obj;
    264         }
    265 
    266         public void setObj(RemoteObject obj) {
    267                 this.obj = obj;
    268147        }
    269148
Note: See TracChangeset for help on using the changeset viewer.