1 | package omq.server; |
---|
2 | |
---|
3 | import omq.common.util.ParameterQueue; |
---|
4 | import omq.exception.SerializerException; |
---|
5 | |
---|
6 | import org.apache.log4j.Logger; |
---|
7 | |
---|
8 | import com.rabbitmq.client.ConsumerCancelledException; |
---|
9 | import com.rabbitmq.client.QueueingConsumer; |
---|
10 | import com.rabbitmq.client.QueueingConsumer.Delivery; |
---|
11 | import com.rabbitmq.client.ShutdownSignalException; |
---|
12 | |
---|
13 | /** |
---|
14 | * An invocationThread waits for requests an invokes them. |
---|
15 | * |
---|
16 | * @author Sergi Toda <sergi.toda@estudiants.urv.cat> |
---|
17 | * |
---|
18 | */ |
---|
19 | public class InvocationThread extends AInvocationThread { |
---|
20 | |
---|
21 | private static final Logger logger = Logger.getLogger(InvocationThread.class.getName()); |
---|
22 | |
---|
23 | // RemoteObject |
---|
24 | private boolean idle; |
---|
25 | private long lastExec; |
---|
26 | |
---|
27 | public InvocationThread(RemoteObject obj) throws Exception { |
---|
28 | super(obj); |
---|
29 | this.lastExec = 0; |
---|
30 | this.idle = true; |
---|
31 | } |
---|
32 | |
---|
33 | @Override |
---|
34 | public void run() { |
---|
35 | while (!killed) { |
---|
36 | try { |
---|
37 | // Get the delivery |
---|
38 | Delivery delivery = consumer.nextDelivery(); |
---|
39 | |
---|
40 | // This thread gets busy |
---|
41 | pool.getBusy().incrementAndGet(); |
---|
42 | idle = false; |
---|
43 | |
---|
44 | executeTask(delivery); |
---|
45 | |
---|
46 | // The thread is now idle |
---|
47 | lastExec = System.currentTimeMillis(); |
---|
48 | idle = true; |
---|
49 | pool.getBusy().decrementAndGet(); |
---|
50 | |
---|
51 | } catch (InterruptedException i) { |
---|
52 | logger.error(i); |
---|
53 | } catch (ShutdownSignalException e) { |
---|
54 | logger.error(e); |
---|
55 | try { |
---|
56 | if (channel.isOpen()) { |
---|
57 | channel.close(); |
---|
58 | } |
---|
59 | startQueues(); |
---|
60 | } catch (Exception e1) { |
---|
61 | try { |
---|
62 | long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000")); |
---|
63 | Thread.sleep(milis); |
---|
64 | } catch (InterruptedException e2) { |
---|
65 | logger.error(e2); |
---|
66 | } |
---|
67 | logger.error(e1); |
---|
68 | } |
---|
69 | } catch (ConsumerCancelledException e) { |
---|
70 | logger.error(e); |
---|
71 | } catch (SerializerException e) { |
---|
72 | logger.error(e); |
---|
73 | } catch (Exception e) { |
---|
74 | e.printStackTrace(); |
---|
75 | logger.error(e); |
---|
76 | } |
---|
77 | |
---|
78 | } |
---|
79 | logger.info("ObjectMQ ('" + obj.getRef() + "') InvocationThread " + Thread.currentThread().getId() + " is killed"); |
---|
80 | } |
---|
81 | |
---|
82 | /** |
---|
83 | * This method starts the queues using the information got in the |
---|
84 | * environment. |
---|
85 | * |
---|
86 | * @throws Exception |
---|
87 | */ |
---|
88 | protected void startQueues() throws Exception { |
---|
89 | // Start channel |
---|
90 | channel = broker.getNewChannel(); |
---|
91 | |
---|
92 | // Get info about which exchange and queue will use |
---|
93 | String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, ""); |
---|
94 | String queue = reference; |
---|
95 | String routingKey = reference; |
---|
96 | |
---|
97 | // RemoteObject default queue |
---|
98 | boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false")); |
---|
99 | boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false")); |
---|
100 | boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false")); |
---|
101 | |
---|
102 | // Declares and bindings |
---|
103 | if (!exchange.equalsIgnoreCase("")) { // Default exchange case |
---|
104 | channel.exchangeDeclare(exchange, "direct"); |
---|
105 | } |
---|
106 | channel.queueDeclare(queue, durable, exclusive, autoDelete, null); |
---|
107 | if (!exchange.equalsIgnoreCase("")) { // Default exchange case |
---|
108 | channel.queueBind(queue, exchange, routingKey); |
---|
109 | } |
---|
110 | logger.info("RemoteObject: " + reference + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable |
---|
111 | + ", Exclusive: " + exclusive + ", AutoDelete: " + autoDelete); |
---|
112 | |
---|
113 | /* |
---|
114 | * UID queue |
---|
115 | */ |
---|
116 | |
---|
117 | if (UID != null) { |
---|
118 | |
---|
119 | boolean uidDurable = false; |
---|
120 | boolean uidExclusive = true; |
---|
121 | boolean uidAutoDelete = true; |
---|
122 | |
---|
123 | channel.queueDeclare(UID, uidDurable, uidExclusive, uidAutoDelete, null); |
---|
124 | if (!exchange.equalsIgnoreCase("")) { // Default exchange case |
---|
125 | channel.queueBind(UID, exchange, UID); |
---|
126 | } |
---|
127 | // TODO logger queue |
---|
128 | // TODO UID queue should be reference + UID |
---|
129 | } |
---|
130 | |
---|
131 | /* |
---|
132 | * Consumer |
---|
133 | */ |
---|
134 | |
---|
135 | // Disable Round Robin behavior |
---|
136 | boolean autoAck = false; |
---|
137 | |
---|
138 | int prefetchCount = 1; |
---|
139 | channel.basicQos(prefetchCount); |
---|
140 | |
---|
141 | // Declare a new consumer |
---|
142 | consumer = new QueueingConsumer(channel); |
---|
143 | channel.basicConsume(queue, autoAck, consumer); |
---|
144 | if (UID != null) { |
---|
145 | channel.basicConsume(UID, autoAck, consumer); |
---|
146 | } |
---|
147 | } |
---|
148 | |
---|
149 | public long getLastExecution() { |
---|
150 | return lastExec; |
---|
151 | } |
---|
152 | |
---|
153 | public boolean isIdle() { |
---|
154 | return idle; |
---|
155 | } |
---|
156 | |
---|
157 | } |
---|