source: branches/supervisor/src/main/java/omq/server/MultiInvocationThread.java

Last change on this file was 108, checked in by stoda, 11 years ago

Refactoring to enable multinvocation thread done.
Now there's a multiinvocation thread which listens to the multiqueue

File size: 3.2 KB
Line 
1package omq.server;
2
3import omq.common.util.ParameterQueue;
4import omq.exception.SerializerException;
5
6import org.apache.log4j.Logger;
7
8import com.rabbitmq.client.ConsumerCancelledException;
9import com.rabbitmq.client.QueueingConsumer;
10import com.rabbitmq.client.QueueingConsumer.Delivery;
11import com.rabbitmq.client.ShutdownSignalException;
12
13public class MultiInvocationThread extends AInvocationThread {
14
15        private static final Logger logger = Logger.getLogger(MultiInvocationThread.class.getName());
16        private static final String multi = "multi#";
17
18        // Consumer
19        private String multiQueue;
20
21        public MultiInvocationThread(RemoteObject obj) throws Exception {
22                super(obj);
23        }
24
25        @Override
26        public void run() {
27                while (!killed) {
28                        try {
29                                // Get the delivery
30                                Delivery delivery = consumer.nextDelivery();
31                                // This thread does not need to set busy because it's mandatory
32                                // to exist
33                                executeTask(delivery);
34                        } catch (InterruptedException i) {
35                                logger.error(i);
36                        } catch (ShutdownSignalException e) {
37                                logger.error(e);
38                                try {
39                                        if (channel.isOpen()) {
40                                                channel.close();
41                                        }
42                                        startQueues();
43                                } catch (Exception e1) {
44                                        try {
45                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
46                                                Thread.sleep(milis);
47                                        } catch (InterruptedException e2) {
48                                                logger.error(e2);
49                                        }
50                                        logger.error(e1);
51                                }
52                        } catch (ConsumerCancelledException e) {
53                                logger.error(e);
54                        } catch (SerializerException e) {
55                                logger.error(e);
56                        } catch (Exception e) {
57                                e.printStackTrace();
58                                logger.error(e);
59                        }
60
61                }
62                logger.info("ObjectMQ ('" + obj.getRef() + "') InvocationThread " + Thread.currentThread().getId() + " is killed");
63        }
64
65        @Override
66        protected void startQueues() throws Exception {
67                // Start channel
68                channel = broker.getNewChannel();
69
70                /*
71                 * Multi queue, exclusive per each instance
72                 */
73
74                // Get info about the multiQueue
75                String multiExchange = multi + reference;
76                // TODO:String multiExchange = multi + exchange + reference;
77                multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
78
79                // Multi queue (exclusive queue per remoteObject)
80                boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
81                boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
82                boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
83
84                // Declares and bindings
85                channel.exchangeDeclare(multiExchange, "fanout");
86                if (multiQueue == null) {
87                        multiQueue = channel.queueDeclare().getQueue();
88                } else {
89                        channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
90                }
91                channel.queueBind(multiQueue, multiExchange, "");
92                logger.info("RemoteObject: " + reference + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: "
93                                + multiDurable + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
94
95                /*
96                 * Consumer
97                 */
98
99                // Disable Round Robin behavior
100                boolean autoAck = false;
101
102                // Declare a new consumer
103                consumer = new QueueingConsumer(channel);
104                channel.basicConsume(multiQueue, autoAck, consumer);
105        }
106
107}
Note: See TracBrowser for help on using the repository browser.