package omq.server; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.ListIterator; import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; import omq.common.broker.Broker; import com.rabbitmq.client.AMQP.Queue.DeclareOk; import com.rabbitmq.client.Channel; /** * * @author Sergi Toda * */ public class RemoteThreadPool extends Thread { private static final Logger logger = Logger.getLogger(RemoteThreadPool.class.getName()); private List workers; private MultiInvocationThread multiWorker; private AtomicInteger busy; private int minPoolThreads; private int maxPoolThreads; private long refresh; private long keepAliveTime; private int maxMessagesPerThread; private RemoteObject obj; private Broker broker; private boolean killed = false; public RemoteThreadPool(int minPoolThreads, int maxPoolThreads, long refresh, long keepAliveTime, int maxMessagesPerThread, RemoteObject obj, Broker broker) { this.minPoolThreads = minPoolThreads; this.maxPoolThreads = maxPoolThreads; this.refresh = refresh; this.keepAliveTime = keepAliveTime; this.maxMessagesPerThread = maxMessagesPerThread; this.obj = obj; this.broker = broker; workers = new ArrayList(minPoolThreads); busy = new AtomicInteger(); } @Override public void run() { /* * Create and start minPoolThreads */ logger.info("ObjectMQ reference: " + obj.getRef() + ", creating: " + minPoolThreads + ", maxPoolThreads: " + maxPoolThreads + ", refresh time: " + refresh + ", keepAlive: " + keepAliveTime + ", maxMessagesPerThread: " + maxMessagesPerThread); try { multiWorker = new MultiInvocationThread(obj); multiWorker.start(); } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); } for (int i = 0; i < minPoolThreads; i++) { try { InvocationThread iThread = new InvocationThread(obj); workers.add(iThread); iThread.start(); } catch (Exception e) { logger.error("Error while creating pool threads", e); e.printStackTrace(); } } while (!killed) { try { Channel channel = broker.getChannel(); DeclareOk dok = channel.queueDeclarePassive(obj.getRef()); int numConsumers = dok.getConsumerCount(); int numMessages = dok.getMessageCount(); int division = numMessages / numConsumers; int numWorkers = workers.size(); if (numWorkers < maxPoolThreads && division >= maxMessagesPerThread) { // Create a new thread System.out.println("Add worker"); InvocationThread worker = new InvocationThread(obj); workers.add(worker); worker.start(); } else if (numWorkers > minPoolThreads && busy.get() < numWorkers) { // Kill idle threads System.out.println("Kill lazy workers, numWorkers = " + numWorkers + ", minPool = " + minPoolThreads + ", busy = " + busy.get()); stopIdleThreads(); } Thread.sleep(refresh); } catch (Exception e) { e.printStackTrace(); logger.error(e); } } } private void stopIdleThreads() { long now = System.currentTimeMillis(); ListIterator lIter = workers.listIterator(); while (workers.size() > minPoolThreads && lIter.hasNext()) { InvocationThread worker = lIter.next(); long lastExec = worker.getLastExecution(); System.out.println("last - now = " + (now - lastExec) + " keep alive = " + keepAliveTime); if (worker.isIdle() && (now - lastExec) > keepAliveTime) { // Kill thread try { worker.kill(); lIter.remove(); } catch (IOException e) { logger.error(e); } } } } public void kill() throws IOException { killed = true; for (InvocationThread iThread : workers) { iThread.kill(); } interrupt(); } public List getWorkers() { return workers; } public void setWorkers(List workers) { this.workers = workers; } public AtomicInteger getBusy() { return busy; } public void setBusy(AtomicInteger busy) { this.busy = busy; } public int getMinPoolThreads() { return minPoolThreads; } public void setMinPoolThreads(int minPoolThreads) { this.minPoolThreads = minPoolThreads; } public int getMaxPoolThreads() { return maxPoolThreads; } public void setMaxPoolThreads(int maxPoolThreads) { this.maxPoolThreads = maxPoolThreads; } public long getRefresh() { return refresh; } public void setRefresh(long refresh) { this.refresh = refresh; } public long getKeepAliveTime() { return keepAliveTime; } public void setKeepAliveTime(long keepAliveTime) { this.keepAliveTime = keepAliveTime; } public int getMaxMessagesPerThread() { return maxMessagesPerThread; } public void setMaxMessagesPerThread(int maxMessagesPerThread) { this.maxMessagesPerThread = maxMessagesPerThread; } }