source: branches/supervisor/src/main/java/omq/server/InvocationThread.java

Last change on this file was 108, checked in by stoda, 11 years ago

Refactoring to enable multinvocation thread done.
Now there's a multiinvocation thread which listens to the multiqueue

File size: 4.0 KB
RevLine 
[44]1package omq.server;
2
[96]3import omq.common.util.ParameterQueue;
4import omq.exception.SerializerException;
[44]5
[49]6import org.apache.log4j.Logger;
7
[96]8import com.rabbitmq.client.ConsumerCancelledException;
9import com.rabbitmq.client.QueueingConsumer;
[44]10import com.rabbitmq.client.QueueingConsumer.Delivery;
[96]11import com.rabbitmq.client.ShutdownSignalException;
[44]12
13/**
[83]14 * An invocationThread waits for requests an invokes them.
[44]15 *
16 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
17 *
18 */
[108]19public class InvocationThread extends AInvocationThread {
[96]20
[49]21        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
[96]22
23        // RemoteObject
[100]24        private boolean idle;
25        private long lastExec;
[96]26
[101]27        public InvocationThread(RemoteObject obj) throws Exception {
[108]28                super(obj);
[100]29                this.lastExec = 0;
30                this.idle = true;
[44]31        }
32
33        @Override
34        public void run() {
35                while (!killed) {
36                        try {
37                                // Get the delivery
[96]38                                Delivery delivery = consumer.nextDelivery();
[44]39
[100]40                                // This thread gets busy
41                                pool.getBusy().incrementAndGet();
42                                idle = false;
43
[108]44                                executeTask(delivery);
[47]45
[100]46                                // The thread is now idle
47                                lastExec = System.currentTimeMillis();
48                                idle = true;
49                                pool.getBusy().decrementAndGet();
50
[44]51                        } catch (InterruptedException i) {
[49]52                                logger.error(i);
[96]53                        } catch (ShutdownSignalException e) {
54                                logger.error(e);
55                                try {
56                                        if (channel.isOpen()) {
57                                                channel.close();
58                                        }
59                                        startQueues();
60                                } catch (Exception e1) {
61                                        try {
62                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
63                                                Thread.sleep(milis);
64                                        } catch (InterruptedException e2) {
65                                                logger.error(e2);
66                                        }
67                                        logger.error(e1);
68                                }
69                        } catch (ConsumerCancelledException e) {
70                                logger.error(e);
71                        } catch (SerializerException e) {
72                                logger.error(e);
[44]73                        } catch (Exception e) {
[101]74                                e.printStackTrace();
[96]75                                logger.error(e);
[44]76                        }
77
78                }
[102]79                logger.info("ObjectMQ ('" + obj.getRef() + "') InvocationThread " + Thread.currentThread().getId() + " is killed");
[44]80        }
81
[96]82        /**
83         * This method starts the queues using the information got in the
84         * environment.
85         *
86         * @throws Exception
87         */
[108]88        protected void startQueues() throws Exception {
[96]89                // Start channel
90                channel = broker.getNewChannel();
91
92                // Get info about which exchange and queue will use
93                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
[105]94                String queue = reference;
95                String routingKey = reference;
[96]96
97                // RemoteObject default queue
98                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
99                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
100                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
101
102                // Declares and bindings
103                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
104                        channel.exchangeDeclare(exchange, "direct");
105                }
106                channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
107                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
108                        channel.queueBind(queue, exchange, routingKey);
109                }
[108]110                logger.info("RemoteObject: " + reference + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable
111                                + ", Exclusive: " + exclusive + ", AutoDelete: " + autoDelete);
[96]112
113                /*
[106]114                 * UID queue
115                 */
116
117                if (UID != null) {
118
119                        boolean uidDurable = false;
120                        boolean uidExclusive = true;
121                        boolean uidAutoDelete = true;
122
123                        channel.queueDeclare(UID, uidDurable, uidExclusive, uidAutoDelete, null);
124                        if (!exchange.equalsIgnoreCase("")) { // Default exchange case
125                                channel.queueBind(UID, exchange, UID);
126                        }
[108]127                        // TODO logger queue
128                        // TODO UID queue should be reference + UID
[106]129                }
130
131                /*
[96]132                 * Consumer
133                 */
134
[108]135                // Disable Round Robin behavior
[96]136                boolean autoAck = false;
137
138                int prefetchCount = 1;
139                channel.basicQos(prefetchCount);
140
141                // Declare a new consumer
142                consumer = new QueueingConsumer(channel);
143                channel.basicConsume(queue, autoAck, consumer);
[106]144                if (UID != null) {
145                        channel.basicConsume(UID, autoAck, consumer);
146                }
[96]147        }
148
[100]149        public long getLastExecution() {
150                return lastExec;
151        }
152
153        public boolean isIdle() {
154                return idle;
155        }
156
[44]157}
Note: See TracBrowser for help on using the repository browser.