[44] | 1 | package omq.server; |
---|
| 2 | |
---|
[96] | 3 | import omq.common.util.ParameterQueue; |
---|
| 4 | import omq.exception.SerializerException; |
---|
[44] | 5 | |
---|
[49] | 6 | import org.apache.log4j.Logger; |
---|
| 7 | |
---|
[96] | 8 | import com.rabbitmq.client.ConsumerCancelledException; |
---|
| 9 | import com.rabbitmq.client.QueueingConsumer; |
---|
[44] | 10 | import com.rabbitmq.client.QueueingConsumer.Delivery; |
---|
[96] | 11 | import com.rabbitmq.client.ShutdownSignalException; |
---|
[44] | 12 | |
---|
| 13 | /** |
---|
[83] | 14 | * An invocationThread waits for requests an invokes them. |
---|
[44] | 15 | * |
---|
| 16 | * @author Sergi Toda <sergi.toda@estudiants.urv.cat> |
---|
| 17 | * |
---|
| 18 | */ |
---|
[108] | 19 | public class InvocationThread extends AInvocationThread { |
---|
[96] | 20 | |
---|
[49] | 21 | private static final Logger logger = Logger.getLogger(InvocationThread.class.getName()); |
---|
[96] | 22 | |
---|
| 23 | // RemoteObject |
---|
[100] | 24 | private boolean idle; |
---|
| 25 | private long lastExec; |
---|
[96] | 26 | |
---|
[101] | 27 | public InvocationThread(RemoteObject obj) throws Exception { |
---|
[108] | 28 | super(obj); |
---|
[100] | 29 | this.lastExec = 0; |
---|
| 30 | this.idle = true; |
---|
[44] | 31 | } |
---|
| 32 | |
---|
| 33 | @Override |
---|
| 34 | public void run() { |
---|
| 35 | while (!killed) { |
---|
| 36 | try { |
---|
| 37 | // Get the delivery |
---|
[96] | 38 | Delivery delivery = consumer.nextDelivery(); |
---|
[44] | 39 | |
---|
[100] | 40 | // This thread gets busy |
---|
| 41 | pool.getBusy().incrementAndGet(); |
---|
| 42 | idle = false; |
---|
| 43 | |
---|
[108] | 44 | executeTask(delivery); |
---|
[47] | 45 | |
---|
[100] | 46 | // The thread is now idle |
---|
| 47 | lastExec = System.currentTimeMillis(); |
---|
| 48 | idle = true; |
---|
| 49 | pool.getBusy().decrementAndGet(); |
---|
| 50 | |
---|
[44] | 51 | } catch (InterruptedException i) { |
---|
[49] | 52 | logger.error(i); |
---|
[96] | 53 | } catch (ShutdownSignalException e) { |
---|
| 54 | logger.error(e); |
---|
| 55 | try { |
---|
| 56 | if (channel.isOpen()) { |
---|
| 57 | channel.close(); |
---|
| 58 | } |
---|
| 59 | startQueues(); |
---|
| 60 | } catch (Exception e1) { |
---|
| 61 | try { |
---|
| 62 | long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000")); |
---|
| 63 | Thread.sleep(milis); |
---|
| 64 | } catch (InterruptedException e2) { |
---|
| 65 | logger.error(e2); |
---|
| 66 | } |
---|
| 67 | logger.error(e1); |
---|
| 68 | } |
---|
| 69 | } catch (ConsumerCancelledException e) { |
---|
| 70 | logger.error(e); |
---|
| 71 | } catch (SerializerException e) { |
---|
| 72 | logger.error(e); |
---|
[44] | 73 | } catch (Exception e) { |
---|
[101] | 74 | e.printStackTrace(); |
---|
[96] | 75 | logger.error(e); |
---|
[44] | 76 | } |
---|
| 77 | |
---|
| 78 | } |
---|
[102] | 79 | logger.info("ObjectMQ ('" + obj.getRef() + "') InvocationThread " + Thread.currentThread().getId() + " is killed"); |
---|
[44] | 80 | } |
---|
| 81 | |
---|
[96] | 82 | /** |
---|
| 83 | * This method starts the queues using the information got in the |
---|
| 84 | * environment. |
---|
| 85 | * |
---|
| 86 | * @throws Exception |
---|
| 87 | */ |
---|
[108] | 88 | protected void startQueues() throws Exception { |
---|
[96] | 89 | // Start channel |
---|
| 90 | channel = broker.getNewChannel(); |
---|
| 91 | |
---|
| 92 | // Get info about which exchange and queue will use |
---|
| 93 | String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, ""); |
---|
[105] | 94 | String queue = reference; |
---|
| 95 | String routingKey = reference; |
---|
[96] | 96 | |
---|
| 97 | // RemoteObject default queue |
---|
| 98 | boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false")); |
---|
| 99 | boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false")); |
---|
| 100 | boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false")); |
---|
| 101 | |
---|
| 102 | // Declares and bindings |
---|
| 103 | if (!exchange.equalsIgnoreCase("")) { // Default exchange case |
---|
| 104 | channel.exchangeDeclare(exchange, "direct"); |
---|
| 105 | } |
---|
| 106 | channel.queueDeclare(queue, durable, exclusive, autoDelete, null); |
---|
| 107 | if (!exchange.equalsIgnoreCase("")) { // Default exchange case |
---|
| 108 | channel.queueBind(queue, exchange, routingKey); |
---|
| 109 | } |
---|
[108] | 110 | logger.info("RemoteObject: " + reference + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable |
---|
| 111 | + ", Exclusive: " + exclusive + ", AutoDelete: " + autoDelete); |
---|
[96] | 112 | |
---|
| 113 | /* |
---|
[106] | 114 | * UID queue |
---|
| 115 | */ |
---|
| 116 | |
---|
| 117 | if (UID != null) { |
---|
| 118 | |
---|
| 119 | boolean uidDurable = false; |
---|
| 120 | boolean uidExclusive = true; |
---|
| 121 | boolean uidAutoDelete = true; |
---|
| 122 | |
---|
| 123 | channel.queueDeclare(UID, uidDurable, uidExclusive, uidAutoDelete, null); |
---|
| 124 | if (!exchange.equalsIgnoreCase("")) { // Default exchange case |
---|
| 125 | channel.queueBind(UID, exchange, UID); |
---|
| 126 | } |
---|
[108] | 127 | // TODO logger queue |
---|
| 128 | // TODO UID queue should be reference + UID |
---|
[106] | 129 | } |
---|
| 130 | |
---|
| 131 | /* |
---|
[96] | 132 | * Consumer |
---|
| 133 | */ |
---|
| 134 | |
---|
[108] | 135 | // Disable Round Robin behavior |
---|
[96] | 136 | boolean autoAck = false; |
---|
| 137 | |
---|
| 138 | int prefetchCount = 1; |
---|
| 139 | channel.basicQos(prefetchCount); |
---|
| 140 | |
---|
| 141 | // Declare a new consumer |
---|
| 142 | consumer = new QueueingConsumer(channel); |
---|
| 143 | channel.basicConsume(queue, autoAck, consumer); |
---|
[106] | 144 | if (UID != null) { |
---|
| 145 | channel.basicConsume(UID, autoAck, consumer); |
---|
| 146 | } |
---|
[96] | 147 | } |
---|
| 148 | |
---|
[100] | 149 | public long getLastExecution() { |
---|
| 150 | return lastExec; |
---|
| 151 | } |
---|
| 152 | |
---|
| 153 | public boolean isIdle() { |
---|
| 154 | return idle; |
---|
| 155 | } |
---|
| 156 | |
---|
[44] | 157 | } |
---|