source: branches/supervisor/src/main/java/omq/server/RemoteThreadPool.java

Last change on this file was 108, checked in by stoda, 11 years ago

Refactoring to enable multinvocation thread done.
Now there's a multiinvocation thread which listens to the multiqueue

File size: 4.8 KB
Line 
1package omq.server;
2
3import java.io.IOException;
4import java.util.ArrayList;
5import java.util.List;
6import java.util.ListIterator;
7import java.util.concurrent.atomic.AtomicInteger;
8
9import org.apache.log4j.Logger;
10
11import omq.common.broker.Broker;
12
13import com.rabbitmq.client.AMQP.Queue.DeclareOk;
14import com.rabbitmq.client.Channel;
15
16/**
17 *
18 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
19 *
20 */
21
22public class RemoteThreadPool extends Thread {
23        private static final Logger logger = Logger.getLogger(RemoteThreadPool.class.getName());
24        private List<InvocationThread> workers;
25        private MultiInvocationThread multiWorker;
26        private AtomicInteger busy;
27        private int minPoolThreads;
28        private int maxPoolThreads;
29        private long refresh;
30        private long keepAliveTime;
31        private int maxMessagesPerThread;
32
33        private RemoteObject obj;
34        private Broker broker;
35        private boolean killed = false;
36
37        public RemoteThreadPool(int minPoolThreads, int maxPoolThreads, long refresh, long keepAliveTime, int maxMessagesPerThread,
38                        RemoteObject obj, Broker broker) {
39                this.minPoolThreads = minPoolThreads;
40                this.maxPoolThreads = maxPoolThreads;
41                this.refresh = refresh;
42                this.keepAliveTime = keepAliveTime;
43                this.maxMessagesPerThread = maxMessagesPerThread;
44                this.obj = obj;
45                this.broker = broker;
46
47                workers = new ArrayList<InvocationThread>(minPoolThreads);
48                busy = new AtomicInteger();
49        }
50
51        @Override
52        public void run() {
53
54                /*
55                 * Create and start minPoolThreads
56                 */
57                logger.info("ObjectMQ reference: " + obj.getRef() + ", creating: " + minPoolThreads + ", maxPoolThreads: " + maxPoolThreads
58                                + ", refresh time: " + refresh + ", keepAlive: " + keepAliveTime + ", maxMessagesPerThread: " + maxMessagesPerThread);
59
60                try {
61                        multiWorker = new MultiInvocationThread(obj);
62                        multiWorker.start();
63                } catch (Exception e1) {
64                        // TODO Auto-generated catch block
65                        e1.printStackTrace();
66                }
67
68                for (int i = 0; i < minPoolThreads; i++) {
69                        try {
70                                InvocationThread iThread = new InvocationThread(obj);
71                                workers.add(iThread);
72                                iThread.start();
73                        } catch (Exception e) {
74                                logger.error("Error while creating pool threads", e);
75                                e.printStackTrace();
76                        }
77                }
78
79                while (!killed) {
80
81                        try {
82                                Channel channel = broker.getChannel();
83                                DeclareOk dok = channel.queueDeclarePassive(obj.getRef());
84
85                                int numConsumers = dok.getConsumerCount();
86                                int numMessages = dok.getMessageCount();
87                                int division = numMessages / numConsumers;
88                                int numWorkers = workers.size();
89
90                                if (numWorkers < maxPoolThreads && division >= maxMessagesPerThread) {
91                                        // Create a new thread
92                                        System.out.println("Add worker");
93                                        InvocationThread worker = new InvocationThread(obj);
94                                        workers.add(worker);
95                                        worker.start();
96                                } else if (numWorkers > minPoolThreads && busy.get() < numWorkers) {
97                                        // Kill idle threads
98                                        System.out.println("Kill lazy workers, numWorkers = " + numWorkers + ", minPool = " + minPoolThreads + ", busy = "
99                                                        + busy.get());
100                                        stopIdleThreads();
101                                }
102
103                                Thread.sleep(refresh);
104
105                        } catch (Exception e) {
106                                e.printStackTrace();
107                                logger.error(e);
108                        }
109                }
110
111        }
112
113        private void stopIdleThreads() {
114                long now = System.currentTimeMillis();
115
116                ListIterator<InvocationThread> lIter = workers.listIterator();
117                while (workers.size() > minPoolThreads && lIter.hasNext()) {
118                        InvocationThread worker = lIter.next();
119                        long lastExec = worker.getLastExecution();
120                        System.out.println("last - now = " + (now - lastExec) + " keep alive = " + keepAliveTime);
121                        if (worker.isIdle() && (now - lastExec) > keepAliveTime) {
122                                // Kill thread
123                                try {
124                                        worker.kill();
125                                        lIter.remove();
126                                } catch (IOException e) {
127                                        logger.error(e);
128                                }
129
130                        }
131                }
132        }
133
134        public void kill() throws IOException {
135                killed = true;
136                for (InvocationThread iThread : workers) {
137                        iThread.kill();
138                }
139                interrupt();
140        }
141
142        public List<InvocationThread> getWorkers() {
143                return workers;
144        }
145
146        public void setWorkers(List<InvocationThread> workers) {
147                this.workers = workers;
148        }
149
150        public AtomicInteger getBusy() {
151                return busy;
152        }
153
154        public void setBusy(AtomicInteger busy) {
155                this.busy = busy;
156        }
157
158        public int getMinPoolThreads() {
159                return minPoolThreads;
160        }
161
162        public void setMinPoolThreads(int minPoolThreads) {
163                this.minPoolThreads = minPoolThreads;
164        }
165
166        public int getMaxPoolThreads() {
167                return maxPoolThreads;
168        }
169
170        public void setMaxPoolThreads(int maxPoolThreads) {
171                this.maxPoolThreads = maxPoolThreads;
172        }
173
174        public long getRefresh() {
175                return refresh;
176        }
177
178        public void setRefresh(long refresh) {
179                this.refresh = refresh;
180        }
181
182        public long getKeepAliveTime() {
183                return keepAliveTime;
184        }
185
186        public void setKeepAliveTime(long keepAliveTime) {
187                this.keepAliveTime = keepAliveTime;
188        }
189
190        public int getMaxMessagesPerThread() {
191                return maxMessagesPerThread;
192        }
193
194        public void setMaxMessagesPerThread(int maxMessagesPerThread) {
195                this.maxMessagesPerThread = maxMessagesPerThread;
196        }
197
198}
Note: See TracBrowser for help on using the repository browser.