source: branches/supervisor/src/main/java/omq/server/RemoteThreadPool.java @ 101

Last change on this file since 101 was 101, checked in by stoda, 11 years ago

PoolThreadSupervisor? done

TODO: revise all threads

File size: 4.6 KB
Line 
1package omq.server;
2
3import java.io.IOException;
4import java.util.ArrayList;
5import java.util.List;
6import java.util.concurrent.atomic.AtomicInteger;
7
8import org.apache.log4j.Logger;
9
10import omq.common.broker.Broker;
11
12import com.rabbitmq.client.AMQP.Queue.DeclareOk;
13import com.rabbitmq.client.Channel;
14
15/**
16 *
17 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
18 *
19 */
20
21public class RemoteThreadPool extends Thread {
22        private static final Logger logger = Logger.getLogger(RemoteThreadPool.class.getName());
23        private List<InvocationThread> workers;
24        private AtomicInteger busy;
25        private int minPoolThreads;
26        private int maxPoolThreads;
27        private long refresh;
28        private long keepAliveTime;
29        private int maxMessagesPerThread;
30
31        private RemoteObject obj;
32        private Broker broker;
33        private boolean killed = false;
34
35        public RemoteThreadPool(int minPoolThreads, int maxPoolThreads, long refresh, long keepAliveTime, int maxMessagesPerThread, RemoteObject obj, Broker broker) {
36                this.minPoolThreads = minPoolThreads;
37                this.maxPoolThreads = maxPoolThreads;
38                this.refresh = refresh;
39                this.keepAliveTime = keepAliveTime;
40                this.maxMessagesPerThread = maxMessagesPerThread;
41                this.obj = obj;
42                this.broker = broker;
43
44                workers = new ArrayList<InvocationThread>(minPoolThreads);
45                busy = new AtomicInteger();
46        }
47
48        @Override
49        public void run() {
50
51                /*
52                 * Create and start minPoolThreads
53                 */
54                logger.info("ObjectMQ reference: " + obj.getRef() + ", creating: " + minPoolThreads + ", maxPoolThreads: " + maxPoolThreads + ", refresh time: "
55                                + refresh + ", keepAlive: " + keepAliveTime + ", maxMessagesPerThread: " + maxMessagesPerThread);
56
57                for (int i = 0; i < minPoolThreads; i++) {
58                        try {
59                                InvocationThread iThread = new InvocationThread(obj);
60                                workers.add(iThread);
61                                iThread.start();
62                        } catch (Exception e) {
63                                logger.error("Error while creating pool threads", e);
64                                e.printStackTrace();
65                        }
66                }
67
68                while (!killed) {
69
70                        try {
71                                Channel channel = broker.getChannel();
72                                DeclareOk dok = channel.queueDeclarePassive(obj.getRef());
73
74                                int numConsumers = dok.getConsumerCount();
75                                int numMessages = dok.getMessageCount();
76                                int division = numMessages / numConsumers;
77                                int numWorkers = workers.size();
78
79                                if (numWorkers < maxPoolThreads && division >= maxMessagesPerThread) {
80                                        // Create a new thread
81                                        System.out.println("Add worker");
82                                        InvocationThread worker = new InvocationThread(obj);
83                                        workers.add(worker);
84                                        worker.start();
85                                } else if (numWorkers > minPoolThreads && busy.get() < numWorkers) {
86                                        // Kill idle threads
87                                        System.out.println("Kill lazy workers, numWorkers = " + numWorkers + ", minPool = " + minPoolThreads + ", busy = " + busy.get());
88                                        stopIdleThreads();
89                                }
90
91                                Thread.sleep(refresh);
92
93                        } catch (Exception e) {
94                                e.printStackTrace();
95                                logger.error(e);
96                        }
97                }
98
99        }
100
101        private void stopIdleThreads() {
102                long now = System.currentTimeMillis();
103
104                int i = 0;
105                for (InvocationThread worker : workers) {
106                        // Ensure there are at least minThreads available
107                        if (workers.size() == minPoolThreads) {
108                                break;
109                        }
110                        long lastExec = worker.getLastExecution();
111                        System.out.println("last - now = " + (now - lastExec) + " keep alive = " + keepAliveTime);
112                        if (worker.isIdle() && (now - lastExec) > keepAliveTime) {
113                                // Kill thread
114                                try {
115                                        worker.kill();
116                                        workers.remove(i);
117                                } catch (IOException e) {
118                                        // TODO Auto-generated catch block
119                                        e.printStackTrace();
120                                }
121
122                        }
123                        i++;
124                }
125        }
126
127        public void kill() throws IOException {
128                killed = true;
129                for (InvocationThread iThread : workers) {
130                        iThread.kill();
131                }
132                interrupt();
133        }
134
135        public List<InvocationThread> getWorkers() {
136                return workers;
137        }
138
139        public void setWorkers(List<InvocationThread> workers) {
140                this.workers = workers;
141        }
142
143        public AtomicInteger getBusy() {
144                return busy;
145        }
146
147        public void setBusy(AtomicInteger busy) {
148                this.busy = busy;
149        }
150
151        public int getMinPoolThreads() {
152                return minPoolThreads;
153        }
154
155        public void setMinPoolThreads(int minPoolThreads) {
156                this.minPoolThreads = minPoolThreads;
157        }
158
159        public int getMaxPoolThreads() {
160                return maxPoolThreads;
161        }
162
163        public void setMaxPoolThreads(int maxPoolThreads) {
164                this.maxPoolThreads = maxPoolThreads;
165        }
166
167        public long getRefresh() {
168                return refresh;
169        }
170
171        public void setRefresh(long refresh) {
172                this.refresh = refresh;
173        }
174
175        public long getKeepAliveTime() {
176                return keepAliveTime;
177        }
178
179        public void setKeepAliveTime(long keepAliveTime) {
180                this.keepAliveTime = keepAliveTime;
181        }
182
183        public int getMaxMessagesPerThread() {
184                return maxMessagesPerThread;
185        }
186
187        public void setMaxMessagesPerThread(int maxMessagesPerThread) {
188                this.maxMessagesPerThread = maxMessagesPerThread;
189        }
190
191}
Note: See TracBrowser for help on using the repository browser.