source: branches/supervisor/src/main/java/omq/server/RemoteThreadPool.java @ 106

Last change on this file since 106 was 102, checked in by stoda, 11 years ago

All tests working

TODO sometimes exceptiontest fails

File size: 4.6 KB
Line 
1package omq.server;
2
3import java.io.IOException;
4import java.util.ArrayList;
5import java.util.List;
6import java.util.ListIterator;
7import java.util.concurrent.atomic.AtomicInteger;
8
9import org.apache.log4j.Logger;
10
11import omq.common.broker.Broker;
12
13import com.rabbitmq.client.AMQP.Queue.DeclareOk;
14import com.rabbitmq.client.Channel;
15
16/**
17 *
18 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
19 *
20 */
21
22public class RemoteThreadPool extends Thread {
23        private static final Logger logger = Logger.getLogger(RemoteThreadPool.class.getName());
24        private List<InvocationThread> workers;
25        private AtomicInteger busy;
26        private int minPoolThreads;
27        private int maxPoolThreads;
28        private long refresh;
29        private long keepAliveTime;
30        private int maxMessagesPerThread;
31
32        private RemoteObject obj;
33        private Broker broker;
34        private boolean killed = false;
35
36        public RemoteThreadPool(int minPoolThreads, int maxPoolThreads, long refresh, long keepAliveTime, int maxMessagesPerThread, RemoteObject obj, Broker broker) {
37                this.minPoolThreads = minPoolThreads;
38                this.maxPoolThreads = maxPoolThreads;
39                this.refresh = refresh;
40                this.keepAliveTime = keepAliveTime;
41                this.maxMessagesPerThread = maxMessagesPerThread;
42                this.obj = obj;
43                this.broker = broker;
44
45                workers = new ArrayList<InvocationThread>(minPoolThreads);
46                busy = new AtomicInteger();
47        }
48
49        @Override
50        public void run() {
51
52                /*
53                 * Create and start minPoolThreads
54                 */
55                logger.info("ObjectMQ reference: " + obj.getRef() + ", creating: " + minPoolThreads + ", maxPoolThreads: " + maxPoolThreads + ", refresh time: "
56                                + refresh + ", keepAlive: " + keepAliveTime + ", maxMessagesPerThread: " + maxMessagesPerThread);
57
58                for (int i = 0; i < minPoolThreads; i++) {
59                        try {
60                                InvocationThread iThread = new InvocationThread(obj);
61                                workers.add(iThread);
62                                iThread.start();
63                        } catch (Exception e) {
64                                logger.error("Error while creating pool threads", e);
65                                e.printStackTrace();
66                        }
67                }
68
69                while (!killed) {
70
71                        try {
72                                Channel channel = broker.getChannel();
73                                DeclareOk dok = channel.queueDeclarePassive(obj.getRef());
74
75                                int numConsumers = dok.getConsumerCount();
76                                int numMessages = dok.getMessageCount();
77                                int division = numMessages / numConsumers;
78                                int numWorkers = workers.size();
79
80                                if (numWorkers < maxPoolThreads && division >= maxMessagesPerThread) {
81                                        // Create a new thread
82                                        System.out.println("Add worker");
83                                        InvocationThread worker = new InvocationThread(obj);
84                                        workers.add(worker);
85                                        worker.start();
86                                } else if (numWorkers > minPoolThreads && busy.get() < numWorkers) {
87                                        // Kill idle threads
88                                        System.out.println("Kill lazy workers, numWorkers = " + numWorkers + ", minPool = " + minPoolThreads + ", busy = " + busy.get());
89                                        stopIdleThreads();
90                                }
91
92                                Thread.sleep(refresh);
93
94                        } catch (Exception e) {
95                                e.printStackTrace();
96                                logger.error(e);
97                        }
98                }
99
100        }
101
102        private void stopIdleThreads() {
103                long now = System.currentTimeMillis();
104
105                ListIterator<InvocationThread> lIter = workers.listIterator();
106                while (workers.size() > minPoolThreads && lIter.hasNext()) {
107                        InvocationThread worker = lIter.next();
108                        long lastExec = worker.getLastExecution();
109                        System.out.println("last - now = " + (now - lastExec) + " keep alive = " + keepAliveTime);
110                        if (worker.isIdle() && (now - lastExec) > keepAliveTime) {
111                                // Kill thread
112                                try {
113                                        worker.kill();
114                                        lIter.remove();
115                                } catch (IOException e) {
116                                        logger.error(e);
117                                }
118
119                        }
120                }
121        }
122
123        public void kill() throws IOException {
124                killed = true;
125                for (InvocationThread iThread : workers) {
126                        iThread.kill();
127                }
128                interrupt();
129        }
130
131        public List<InvocationThread> getWorkers() {
132                return workers;
133        }
134
135        public void setWorkers(List<InvocationThread> workers) {
136                this.workers = workers;
137        }
138
139        public AtomicInteger getBusy() {
140                return busy;
141        }
142
143        public void setBusy(AtomicInteger busy) {
144                this.busy = busy;
145        }
146
147        public int getMinPoolThreads() {
148                return minPoolThreads;
149        }
150
151        public void setMinPoolThreads(int minPoolThreads) {
152                this.minPoolThreads = minPoolThreads;
153        }
154
155        public int getMaxPoolThreads() {
156                return maxPoolThreads;
157        }
158
159        public void setMaxPoolThreads(int maxPoolThreads) {
160                this.maxPoolThreads = maxPoolThreads;
161        }
162
163        public long getRefresh() {
164                return refresh;
165        }
166
167        public void setRefresh(long refresh) {
168                this.refresh = refresh;
169        }
170
171        public long getKeepAliveTime() {
172                return keepAliveTime;
173        }
174
175        public void setKeepAliveTime(long keepAliveTime) {
176                this.keepAliveTime = keepAliveTime;
177        }
178
179        public int getMaxMessagesPerThread() {
180                return maxMessagesPerThread;
181        }
182
183        public void setMaxMessagesPerThread(int maxMessagesPerThread) {
184                this.maxMessagesPerThread = maxMessagesPerThread;
185        }
186
187}
Note: See TracBrowser for help on using the repository browser.