Changeset 101


Ignore:
Timestamp:
10/11/13 13:44:19 (11 years ago)
Author:
stoda
Message:

PoolThreadSupervisor? done

TODO: revise all threads

Location:
branches/supervisor/src
Files:
4 added
4 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/common/util/ParameterQueue.java

    r84 r101  
    138138
    139139        /**
     140         * Set the minimum number of threads in a pool
     141         */
     142        public static String MIN_POOL_THREADS = "omq.min_num_threads";
     143
     144        /**
     145         * Set the maximum number of threads in a pool
     146         */
     147        public static String MAX_POOL_THREADS = "omq.max_num_threads";
     148
     149        /**
     150         * Set the refresh time to see how many messages are on the queue
     151         */
     152        public static String POOL_REFRESH_TIME = "omq.refresh_time";
     153
     154        /**
     155         * Set the maximum number of threads in a pool
     156         */
     157        public static String KEEP_ALIVE_TIME = "omq.keep_alive_time";
     158
     159        /**
     160         * Set the maximum number of messages per thread to create a new one
     161         */
     162        public static String MAX_MESSAGES_PER_THREAD = "omq.max_messages_per_thread";
     163
     164        /**
    140165         * Time in milis by default is set in a minute
    141166         */
  • branches/supervisor/src/main/java/omq/server/InvocationThread.java

    r100 r101  
    4040        private long lastExec;
    4141
    42         private RemoteThreadPool pool; // TODO posar això bé
     42        private RemoteThreadPool pool;
    4343
    4444        // Broker
     
    5252        private boolean killed = false;
    5353
    54         public InvocationThread(RemoteObject obj, Broker broker) throws Exception {
     54        public InvocationThread(RemoteObject obj) throws Exception {
    5555                this.obj = obj;
    5656                this.UID = obj.getRef();
    5757                this.env = obj.getEnv();
    58                 this.broker = broker;
     58                this.broker = obj.getBroker();
     59                this.pool = obj.getPool();
    5960                this.serializer = broker.getSerializer();
    6061                this.lastExec = 0;
     
    151152                                logger.error(e);
    152153                        } catch (Exception e) {
     154                                e.printStackTrace();
    153155                                logger.error(e);
    154156                        }
  • branches/supervisor/src/main/java/omq/server/RemoteObject.java

    r100 r101  
    3535        private transient RemoteThreadPool pool;
    3636        private transient Map<String, List<Class<?>>> params;
    37         private transient List<InvocationThread> invocationList;
    3837
    3938        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
     
    7473                }
    7574
    76                 // Get num threads to use
    77                 int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
    78                 invocationList = new ArrayList<InvocationThread>(numThreads);
    79 
    80                 // Start invocation threads
    81                 for (int i = 0; i < numThreads; i++) {
    82                         InvocationThread iThread = new InvocationThread(this, broker);
    83                         invocationList.add(iThread);
    84                         iThread.start();
    85                 }
    86 
     75                // Get pool information
     76                int minPoolThreads = Integer.parseInt(env.getProperty(ParameterQueue.MIN_POOL_THREADS, "1"));
     77                int maxPoolThreads = Integer.parseInt(env.getProperty(ParameterQueue.MAX_POOL_THREADS, "1"));
     78                long refresh = Long.parseLong(env.getProperty(ParameterQueue.POOL_REFRESH_TIME, "60000"));
     79                long keepAliveTime = Long.parseLong(env.getProperty(ParameterQueue.KEEP_ALIVE_TIME, "30000"));
     80                int maxMessagesPerThread = Integer.parseInt(env.getProperty(ParameterQueue.MAX_MESSAGES_PER_THREAD, "5"));
     81
     82                // Create the pool & start it
     83                pool = new RemoteThreadPool(minPoolThreads, maxPoolThreads, refresh, keepAliveTime, maxMessagesPerThread, this, broker);
     84                pool.start();
    8785        }
    8886
     
    10098        public void kill() throws IOException {
    10199                logger.info("Killing objectmq: " + this.getRef());
    102                 for (InvocationThread iThread : invocationList) {
    103                         iThread.kill();
    104                 }
     100                pool.kill();
    105101        }
    106102
     
    205201        }
    206202
     203        public RemoteThreadPool getPool() {
     204                return pool;
     205        }
     206
    207207        public Properties getEnv() {
    208208                return env;
  • branches/supervisor/src/main/java/omq/server/RemoteThreadPool.java

    r100 r101  
    66import java.util.concurrent.atomic.AtomicInteger;
    77
     8import org.apache.log4j.Logger;
     9
    810import omq.common.broker.Broker;
    911
     
    1113import com.rabbitmq.client.Channel;
    1214
     15/**
     16 *
     17 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
     18 *
     19 */
     20
    1321public class RemoteThreadPool extends Thread {
     22        private static final Logger logger = Logger.getLogger(RemoteThreadPool.class.getName());
    1423        private List<InvocationThread> workers;
    1524        private AtomicInteger busy;
     
    4049        public void run() {
    4150
    42                 // Crear aquí tots els fils?
     51                /*
     52                 * Create and start minPoolThreads
     53                 */
     54                logger.info("ObjectMQ reference: " + obj.getRef() + ", creating: " + minPoolThreads + ", maxPoolThreads: " + maxPoolThreads + ", refresh time: "
     55                                + refresh + ", keepAlive: " + keepAliveTime + ", maxMessagesPerThread: " + maxMessagesPerThread);
     56
     57                for (int i = 0; i < minPoolThreads; i++) {
     58                        try {
     59                                InvocationThread iThread = new InvocationThread(obj);
     60                                workers.add(iThread);
     61                                iThread.start();
     62                        } catch (Exception e) {
     63                                logger.error("Error while creating pool threads", e);
     64                                e.printStackTrace();
     65                        }
     66                }
    4367
    4468                while (!killed) {
     
    5579                                if (numWorkers < maxPoolThreads && division >= maxMessagesPerThread) {
    5680                                        // Create a new thread
    57                                         InvocationThread worker = new InvocationThread(obj, broker);
     81                                        System.out.println("Add worker");
     82                                        InvocationThread worker = new InvocationThread(obj);
    5883                                        workers.add(worker);
    5984                                        worker.start();
    6085                                } else if (numWorkers > minPoolThreads && busy.get() < numWorkers) {
    6186                                        // Kill idle threads
     87                                        System.out.println("Kill lazy workers, numWorkers = " + numWorkers + ", minPool = " + minPoolThreads + ", busy = " + busy.get());
    6288                                        stopIdleThreads();
    6389                                }
     
    6692
    6793                        } catch (Exception e) {
    68 
     94                                e.printStackTrace();
     95                                logger.error(e);
    6996                        }
    7097                }
     
    75102                long now = System.currentTimeMillis();
    76103
     104                int i = 0;
    77105                for (InvocationThread worker : workers) {
     106                        // Ensure there are at least minThreads available
     107                        if (workers.size() == minPoolThreads) {
     108                                break;
     109                        }
    78110                        long lastExec = worker.getLastExecution();
    79                         if (worker.isIdle() && (lastExec - now) > keepAliveTime) {
     111                        System.out.println("last - now = " + (now - lastExec) + " keep alive = " + keepAliveTime);
     112                        if (worker.isIdle() && (now - lastExec) > keepAliveTime) {
    80113                                // Kill thread
    81114                                try {
    82115                                        worker.kill();
     116                                        workers.remove(i);
    83117                                } catch (IOException e) {
    84118                                        // TODO Auto-generated catch block
     
    87121
    88122                        }
     123                        i++;
    89124                }
     125        }
     126
     127        public void kill() throws IOException {
     128                killed = true;
     129                for (InvocationThread iThread : workers) {
     130                        iThread.kill();
     131                }
     132                interrupt();
    90133        }
    91134
Note: See TracChangeset for help on using the changeset viewer.