source: branches/supervisor/src/main/java/omq/server/RemoteThreadPool.java @ 100

Last change on this file since 100 was 100, checked in by stoda, 11 years ago

Idea of thread supervisor done

TODO test the idea, change the remote properties, etc

File size: 3.3 KB
Line 
1package omq.server;
2
3import java.io.IOException;
4import java.util.ArrayList;
5import java.util.List;
6import java.util.concurrent.atomic.AtomicInteger;
7
8import omq.common.broker.Broker;
9
10import com.rabbitmq.client.AMQP.Queue.DeclareOk;
11import com.rabbitmq.client.Channel;
12
13public class RemoteThreadPool extends Thread {
14        private List<InvocationThread> workers;
15        private AtomicInteger busy;
16        private int minPoolThreads;
17        private int maxPoolThreads;
18        private long refresh;
19        private long keepAliveTime;
20        private int maxMessagesPerThread;
21
22        private RemoteObject obj;
23        private Broker broker;
24        private boolean killed = false;
25
26        public RemoteThreadPool(int minPoolThreads, int maxPoolThreads, long refresh, long keepAliveTime, int maxMessagesPerThread, RemoteObject obj, Broker broker) {
27                this.minPoolThreads = minPoolThreads;
28                this.maxPoolThreads = maxPoolThreads;
29                this.refresh = refresh;
30                this.keepAliveTime = keepAliveTime;
31                this.maxMessagesPerThread = maxMessagesPerThread;
32                this.obj = obj;
33                this.broker = broker;
34
35                workers = new ArrayList<InvocationThread>(minPoolThreads);
36                busy = new AtomicInteger();
37        }
38
39        @Override
40        public void run() {
41
42                // Crear aquí tots els fils?
43
44                while (!killed) {
45
46                        try {
47                                Channel channel = broker.getChannel();
48                                DeclareOk dok = channel.queueDeclarePassive(obj.getRef());
49
50                                int numConsumers = dok.getConsumerCount();
51                                int numMessages = dok.getMessageCount();
52                                int division = numMessages / numConsumers;
53                                int numWorkers = workers.size();
54
55                                if (numWorkers < maxPoolThreads && division >= maxMessagesPerThread) {
56                                        // Create a new thread
57                                        InvocationThread worker = new InvocationThread(obj, broker);
58                                        workers.add(worker);
59                                        worker.start();
60                                } else if (numWorkers > minPoolThreads && busy.get() < numWorkers) {
61                                        // Kill idle threads
62                                        stopIdleThreads();
63                                }
64
65                                Thread.sleep(refresh);
66
67                        } catch (Exception e) {
68
69                        }
70                }
71
72        }
73
74        private void stopIdleThreads() {
75                long now = System.currentTimeMillis();
76
77                for (InvocationThread worker : workers) {
78                        long lastExec = worker.getLastExecution();
79                        if (worker.isIdle() && (lastExec - now) > keepAliveTime) {
80                                // Kill thread
81                                try {
82                                        worker.kill();
83                                } catch (IOException e) {
84                                        // TODO Auto-generated catch block
85                                        e.printStackTrace();
86                                }
87
88                        }
89                }
90        }
91
92        public List<InvocationThread> getWorkers() {
93                return workers;
94        }
95
96        public void setWorkers(List<InvocationThread> workers) {
97                this.workers = workers;
98        }
99
100        public AtomicInteger getBusy() {
101                return busy;
102        }
103
104        public void setBusy(AtomicInteger busy) {
105                this.busy = busy;
106        }
107
108        public int getMinPoolThreads() {
109                return minPoolThreads;
110        }
111
112        public void setMinPoolThreads(int minPoolThreads) {
113                this.minPoolThreads = minPoolThreads;
114        }
115
116        public int getMaxPoolThreads() {
117                return maxPoolThreads;
118        }
119
120        public void setMaxPoolThreads(int maxPoolThreads) {
121                this.maxPoolThreads = maxPoolThreads;
122        }
123
124        public long getRefresh() {
125                return refresh;
126        }
127
128        public void setRefresh(long refresh) {
129                this.refresh = refresh;
130        }
131
132        public long getKeepAliveTime() {
133                return keepAliveTime;
134        }
135
136        public void setKeepAliveTime(long keepAliveTime) {
137                this.keepAliveTime = keepAliveTime;
138        }
139
140        public int getMaxMessagesPerThread() {
141                return maxMessagesPerThread;
142        }
143
144        public void setMaxMessagesPerThread(int maxMessagesPerThread) {
145                this.maxMessagesPerThread = maxMessagesPerThread;
146        }
147
148}
Note: See TracBrowser for help on using the repository browser.