source: branches/supervisor/src/main/java/omq/server/InvocationThread.java

Last change on this file was 108, checked in by stoda, 11 years ago

Refactoring to enable multinvocation thread done.
Now there's a multiinvocation thread which listens to the multiqueue

File size: 4.0 KB
Line 
1package omq.server;
2
3import omq.common.util.ParameterQueue;
4import omq.exception.SerializerException;
5
6import org.apache.log4j.Logger;
7
8import com.rabbitmq.client.ConsumerCancelledException;
9import com.rabbitmq.client.QueueingConsumer;
10import com.rabbitmq.client.QueueingConsumer.Delivery;
11import com.rabbitmq.client.ShutdownSignalException;
12
13/**
14 * An invocationThread waits for requests an invokes them.
15 *
16 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
17 *
18 */
19public class InvocationThread extends AInvocationThread {
20
21        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
22
23        // RemoteObject
24        private boolean idle;
25        private long lastExec;
26
27        public InvocationThread(RemoteObject obj) throws Exception {
28                super(obj);
29                this.lastExec = 0;
30                this.idle = true;
31        }
32
33        @Override
34        public void run() {
35                while (!killed) {
36                        try {
37                                // Get the delivery
38                                Delivery delivery = consumer.nextDelivery();
39
40                                // This thread gets busy
41                                pool.getBusy().incrementAndGet();
42                                idle = false;
43
44                                executeTask(delivery);
45
46                                // The thread is now idle
47                                lastExec = System.currentTimeMillis();
48                                idle = true;
49                                pool.getBusy().decrementAndGet();
50
51                        } catch (InterruptedException i) {
52                                logger.error(i);
53                        } catch (ShutdownSignalException e) {
54                                logger.error(e);
55                                try {
56                                        if (channel.isOpen()) {
57                                                channel.close();
58                                        }
59                                        startQueues();
60                                } catch (Exception e1) {
61                                        try {
62                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
63                                                Thread.sleep(milis);
64                                        } catch (InterruptedException e2) {
65                                                logger.error(e2);
66                                        }
67                                        logger.error(e1);
68                                }
69                        } catch (ConsumerCancelledException e) {
70                                logger.error(e);
71                        } catch (SerializerException e) {
72                                logger.error(e);
73                        } catch (Exception e) {
74                                e.printStackTrace();
75                                logger.error(e);
76                        }
77
78                }
79                logger.info("ObjectMQ ('" + obj.getRef() + "') InvocationThread " + Thread.currentThread().getId() + " is killed");
80        }
81
82        /**
83         * This method starts the queues using the information got in the
84         * environment.
85         *
86         * @throws Exception
87         */
88        protected void startQueues() throws Exception {
89                // Start channel
90                channel = broker.getNewChannel();
91
92                // Get info about which exchange and queue will use
93                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
94                String queue = reference;
95                String routingKey = reference;
96
97                // RemoteObject default queue
98                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
99                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
100                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
101
102                // Declares and bindings
103                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
104                        channel.exchangeDeclare(exchange, "direct");
105                }
106                channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
107                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
108                        channel.queueBind(queue, exchange, routingKey);
109                }
110                logger.info("RemoteObject: " + reference + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable
111                                + ", Exclusive: " + exclusive + ", AutoDelete: " + autoDelete);
112
113                /*
114                 * UID queue
115                 */
116
117                if (UID != null) {
118
119                        boolean uidDurable = false;
120                        boolean uidExclusive = true;
121                        boolean uidAutoDelete = true;
122
123                        channel.queueDeclare(UID, uidDurable, uidExclusive, uidAutoDelete, null);
124                        if (!exchange.equalsIgnoreCase("")) { // Default exchange case
125                                channel.queueBind(UID, exchange, UID);
126                        }
127                        // TODO logger queue
128                        // TODO UID queue should be reference + UID
129                }
130
131                /*
132                 * Consumer
133                 */
134
135                // Disable Round Robin behavior
136                boolean autoAck = false;
137
138                int prefetchCount = 1;
139                channel.basicQos(prefetchCount);
140
141                // Declare a new consumer
142                consumer = new QueueingConsumer(channel);
143                channel.basicConsume(queue, autoAck, consumer);
144                if (UID != null) {
145                        channel.basicConsume(UID, autoAck, consumer);
146                }
147        }
148
149        public long getLastExecution() {
150                return lastExec;
151        }
152
153        public boolean isIdle() {
154                return idle;
155        }
156
157}
Note: See TracBrowser for help on using the repository browser.