Ignore:
Timestamp:
10/11/13 13:44:19 (11 years ago)
Author:
stoda
Message:

PoolThreadSupervisor? done

TODO: revise all threads

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/server/RemoteThreadPool.java

    r100 r101  
    66import java.util.concurrent.atomic.AtomicInteger;
    77
     8import org.apache.log4j.Logger;
     9
    810import omq.common.broker.Broker;
    911
     
    1113import com.rabbitmq.client.Channel;
    1214
     15/**
     16 *
     17 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
     18 *
     19 */
     20
    1321public class RemoteThreadPool extends Thread {
     22        private static final Logger logger = Logger.getLogger(RemoteThreadPool.class.getName());
    1423        private List<InvocationThread> workers;
    1524        private AtomicInteger busy;
     
    4049        public void run() {
    4150
    42                 // Crear aquí tots els fils?
     51                /*
     52                 * Create and start minPoolThreads
     53                 */
     54                logger.info("ObjectMQ reference: " + obj.getRef() + ", creating: " + minPoolThreads + ", maxPoolThreads: " + maxPoolThreads + ", refresh time: "
     55                                + refresh + ", keepAlive: " + keepAliveTime + ", maxMessagesPerThread: " + maxMessagesPerThread);
     56
     57                for (int i = 0; i < minPoolThreads; i++) {
     58                        try {
     59                                InvocationThread iThread = new InvocationThread(obj);
     60                                workers.add(iThread);
     61                                iThread.start();
     62                        } catch (Exception e) {
     63                                logger.error("Error while creating pool threads", e);
     64                                e.printStackTrace();
     65                        }
     66                }
    4367
    4468                while (!killed) {
     
    5579                                if (numWorkers < maxPoolThreads && division >= maxMessagesPerThread) {
    5680                                        // Create a new thread
    57                                         InvocationThread worker = new InvocationThread(obj, broker);
     81                                        System.out.println("Add worker");
     82                                        InvocationThread worker = new InvocationThread(obj);
    5883                                        workers.add(worker);
    5984                                        worker.start();
    6085                                } else if (numWorkers > minPoolThreads && busy.get() < numWorkers) {
    6186                                        // Kill idle threads
     87                                        System.out.println("Kill lazy workers, numWorkers = " + numWorkers + ", minPool = " + minPoolThreads + ", busy = " + busy.get());
    6288                                        stopIdleThreads();
    6389                                }
     
    6692
    6793                        } catch (Exception e) {
    68 
     94                                e.printStackTrace();
     95                                logger.error(e);
    6996                        }
    7097                }
     
    75102                long now = System.currentTimeMillis();
    76103
     104                int i = 0;
    77105                for (InvocationThread worker : workers) {
     106                        // Ensure there are at least minThreads available
     107                        if (workers.size() == minPoolThreads) {
     108                                break;
     109                        }
    78110                        long lastExec = worker.getLastExecution();
    79                         if (worker.isIdle() && (lastExec - now) > keepAliveTime) {
     111                        System.out.println("last - now = " + (now - lastExec) + " keep alive = " + keepAliveTime);
     112                        if (worker.isIdle() && (now - lastExec) > keepAliveTime) {
    80113                                // Kill thread
    81114                                try {
    82115                                        worker.kill();
     116                                        workers.remove(i);
    83117                                } catch (IOException e) {
    84118                                        // TODO Auto-generated catch block
     
    87121
    88122                        }
     123                        i++;
    89124                }
     125        }
     126
     127        public void kill() throws IOException {
     128                killed = true;
     129                for (InvocationThread iThread : workers) {
     130                        iThread.kill();
     131                }
     132                interrupt();
    90133        }
    91134
Note: See TracChangeset for help on using the changeset viewer.