package omq.server; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.Properties; import omq.common.broker.Broker; import omq.common.message.Request; import omq.common.message.Response; import omq.common.util.ParameterQueue; import omq.common.util.Serializer; import omq.exception.OmqException; import omq.exception.SerializerException; import org.apache.log4j.Logger; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownSignalException; /** * An invocationThread waits for requests an invokes them. * * @author Sergi Toda * */ public class InvocationThread extends Thread { private static final Logger logger = Logger.getLogger(InvocationThread.class.getName()); private static final String multi = "multi#"; // RemoteObject private RemoteObject obj; private String UID; private Properties env; private boolean idle; private long lastExec; private RemoteThreadPool pool; // Broker private Broker broker; private Serializer serializer; // Consumer private Channel channel; private QueueingConsumer consumer; private String multiQueue; private boolean killed = false; public InvocationThread(RemoteObject obj) throws Exception { this.obj = obj; this.UID = obj.getRef(); this.env = obj.getEnv(); this.broker = obj.getBroker(); this.pool = obj.getPool(); this.serializer = broker.getSerializer(); this.lastExec = 0; this.idle = true; } @Override public synchronized void start() { try { startQueues(); super.start(); } catch (Exception e) { logger.error("Cannot start a remoteObject", e); } } @Override public void run() { while (!killed) { try { // Get the delivery Delivery delivery = consumer.nextDelivery(); // This thread gets busy pool.getBusy().incrementAndGet(); idle = false; String serializerType = delivery.getProperties().getType(); // Deserialize the json Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj); String methodName = request.getMethod(); String requestID = request.getId(); logger.debug("Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + ", serializerType: " + serializerType); // Invoke the method Object result = null; OmqException error = null; try { result = obj.invokeMethod(request.getMethod(), request.getParams()); } catch (InvocationTargetException e) { Throwable throwable = e.getTargetException(); logger.error("Object: " + obj.getRef() + " at method: " + methodName + ", corrID" + requestID, throwable); error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage()); } catch (NoSuchMethodException e) { logger.error("Object: " + obj.getRef() + " cannot find method: " + methodName); error = new OmqException(e.getClass().getCanonicalName(), e.getMessage()); } // Reply if it's necessary if (!request.isAsync()) { Response resp = new Response(request.getId(), obj.getRef(), result, error); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build(); byte[] bytesResponse = serializer.serialize(serializerType, resp); channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse); logger.debug("Publish sync response -> Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + " replyTo: " + props.getReplyTo()); } channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // The thread is now idle lastExec = System.currentTimeMillis(); idle = true; pool.getBusy().decrementAndGet(); } catch (InterruptedException i) { logger.error(i); } catch (ShutdownSignalException e) { logger.error(e); try { if (channel.isOpen()) { channel.close(); } startQueues(); } catch (Exception e1) { try { long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000")); Thread.sleep(milis); } catch (InterruptedException e2) { logger.error(e2); } logger.error(e1); } } catch (ConsumerCancelledException e) { logger.error(e); } catch (SerializerException e) { logger.error(e); } catch (Exception e) { e.printStackTrace(); logger.error(e); } } logger.info("ObjectMQ ('" + obj.getRef() + "') InvocationThread " + Thread.currentThread().getId() + " is killed"); } /** * This method starts the queues using the information got in the * environment. * * @throws Exception */ private void startQueues() throws Exception { // Start channel channel = broker.getNewChannel(); /* * Default queue, Round Robin behaviour */ // Get info about which exchange and queue will use String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, ""); String queue = UID; String routingKey = UID; // RemoteObject default queue boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false")); boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false")); boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false")); // Declares and bindings if (!exchange.equalsIgnoreCase("")) { // Default exchange case channel.exchangeDeclare(exchange, "direct"); } channel.queueDeclare(queue, durable, exclusive, autoDelete, null); if (!exchange.equalsIgnoreCase("")) { // Default exchange case channel.queueBind(queue, exchange, routingKey); } logger.info("RemoteObject: " + UID + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: " + exclusive + ", AutoDelete: " + autoDelete); /* * Multi queue, exclusive per each instance */ // Get info about the multiQueue String multiExchange = multi + UID; // TODO:String multiExchange = multi + exchange + UID; multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME); // Multi queue (exclusive queue per remoteObject) boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false")); boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true")); boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true")); // Declares and bindings channel.exchangeDeclare(multiExchange, "fanout"); if (multiQueue == null) { multiQueue = channel.queueDeclare().getQueue(); } else { channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null); } channel.queueBind(multiQueue, multiExchange, ""); logger.info("RemoteObject: " + UID + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete); /* * Consumer */ boolean autoAck = false; int prefetchCount = 1; channel.basicQos(prefetchCount); // Declare a new consumer consumer = new QueueingConsumer(channel); channel.basicConsume(queue, autoAck, consumer); channel.basicConsume(multiQueue, autoAck, consumer); } public void kill() throws IOException { logger.info("Killing objectmq: " + UID + " thread id"); killed = true; interrupt(); channel.close(); } public RemoteObject getObj() { return obj; } public void setObj(RemoteObject obj) { this.obj = obj; } public long getLastExecution() { return lastExec; } public boolean isIdle() { return idle; } }