Changeset 101
- Timestamp:
- 10/11/13 13:44:19 (11 years ago)
- Location:
- branches/supervisor/src
- Files:
-
- 4 added
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/src/main/java/omq/common/util/ParameterQueue.java
r84 r101 138 138 139 139 /** 140 * Set the minimum number of threads in a pool 141 */ 142 public static String MIN_POOL_THREADS = "omq.min_num_threads"; 143 144 /** 145 * Set the maximum number of threads in a pool 146 */ 147 public static String MAX_POOL_THREADS = "omq.max_num_threads"; 148 149 /** 150 * Set the refresh time to see how many messages are on the queue 151 */ 152 public static String POOL_REFRESH_TIME = "omq.refresh_time"; 153 154 /** 155 * Set the maximum number of threads in a pool 156 */ 157 public static String KEEP_ALIVE_TIME = "omq.keep_alive_time"; 158 159 /** 160 * Set the maximum number of messages per thread to create a new one 161 */ 162 public static String MAX_MESSAGES_PER_THREAD = "omq.max_messages_per_thread"; 163 164 /** 140 165 * Time in milis by default is set in a minute 141 166 */ -
branches/supervisor/src/main/java/omq/server/InvocationThread.java
r100 r101 40 40 private long lastExec; 41 41 42 private RemoteThreadPool pool; // TODO posar això bé42 private RemoteThreadPool pool; 43 43 44 44 // Broker … … 52 52 private boolean killed = false; 53 53 54 public InvocationThread(RemoteObject obj , Broker broker) throws Exception {54 public InvocationThread(RemoteObject obj) throws Exception { 55 55 this.obj = obj; 56 56 this.UID = obj.getRef(); 57 57 this.env = obj.getEnv(); 58 this.broker = broker; 58 this.broker = obj.getBroker(); 59 this.pool = obj.getPool(); 59 60 this.serializer = broker.getSerializer(); 60 61 this.lastExec = 0; … … 151 152 logger.error(e); 152 153 } catch (Exception e) { 154 e.printStackTrace(); 153 155 logger.error(e); 154 156 } -
branches/supervisor/src/main/java/omq/server/RemoteObject.java
r100 r101 35 35 private transient RemoteThreadPool pool; 36 36 private transient Map<String, List<Class<?>>> params; 37 private transient List<InvocationThread> invocationList;38 37 39 38 private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>(); … … 74 73 } 75 74 76 // Get num threads to use 77 int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1")); 78 invocationList = new ArrayList<InvocationThread>(numThreads); 79 80 // Start invocation threads 81 for (int i = 0; i < numThreads; i++) { 82 InvocationThread iThread = new InvocationThread(this, broker); 83 invocationList.add(iThread); 84 iThread.start(); 85 } 86 75 // Get pool information 76 int minPoolThreads = Integer.parseInt(env.getProperty(ParameterQueue.MIN_POOL_THREADS, "1")); 77 int maxPoolThreads = Integer.parseInt(env.getProperty(ParameterQueue.MAX_POOL_THREADS, "1")); 78 long refresh = Long.parseLong(env.getProperty(ParameterQueue.POOL_REFRESH_TIME, "60000")); 79 long keepAliveTime = Long.parseLong(env.getProperty(ParameterQueue.KEEP_ALIVE_TIME, "30000")); 80 int maxMessagesPerThread = Integer.parseInt(env.getProperty(ParameterQueue.MAX_MESSAGES_PER_THREAD, "5")); 81 82 // Create the pool & start it 83 pool = new RemoteThreadPool(minPoolThreads, maxPoolThreads, refresh, keepAliveTime, maxMessagesPerThread, this, broker); 84 pool.start(); 87 85 } 88 86 … … 100 98 public void kill() throws IOException { 101 99 logger.info("Killing objectmq: " + this.getRef()); 102 for (InvocationThread iThread : invocationList) { 103 iThread.kill(); 104 } 100 pool.kill(); 105 101 } 106 102 … … 205 201 } 206 202 203 public RemoteThreadPool getPool() { 204 return pool; 205 } 206 207 207 public Properties getEnv() { 208 208 return env; -
branches/supervisor/src/main/java/omq/server/RemoteThreadPool.java
r100 r101 6 6 import java.util.concurrent.atomic.AtomicInteger; 7 7 8 import org.apache.log4j.Logger; 9 8 10 import omq.common.broker.Broker; 9 11 … … 11 13 import com.rabbitmq.client.Channel; 12 14 15 /** 16 * 17 * @author Sergi Toda <sergi.toda@estudiants.urv.cat> 18 * 19 */ 20 13 21 public class RemoteThreadPool extends Thread { 22 private static final Logger logger = Logger.getLogger(RemoteThreadPool.class.getName()); 14 23 private List<InvocationThread> workers; 15 24 private AtomicInteger busy; … … 40 49 public void run() { 41 50 42 // Crear aquí tots els fils? 51 /* 52 * Create and start minPoolThreads 53 */ 54 logger.info("ObjectMQ reference: " + obj.getRef() + ", creating: " + minPoolThreads + ", maxPoolThreads: " + maxPoolThreads + ", refresh time: " 55 + refresh + ", keepAlive: " + keepAliveTime + ", maxMessagesPerThread: " + maxMessagesPerThread); 56 57 for (int i = 0; i < minPoolThreads; i++) { 58 try { 59 InvocationThread iThread = new InvocationThread(obj); 60 workers.add(iThread); 61 iThread.start(); 62 } catch (Exception e) { 63 logger.error("Error while creating pool threads", e); 64 e.printStackTrace(); 65 } 66 } 43 67 44 68 while (!killed) { … … 55 79 if (numWorkers < maxPoolThreads && division >= maxMessagesPerThread) { 56 80 // Create a new thread 57 InvocationThread worker = new InvocationThread(obj, broker); 81 System.out.println("Add worker"); 82 InvocationThread worker = new InvocationThread(obj); 58 83 workers.add(worker); 59 84 worker.start(); 60 85 } else if (numWorkers > minPoolThreads && busy.get() < numWorkers) { 61 86 // Kill idle threads 87 System.out.println("Kill lazy workers, numWorkers = " + numWorkers + ", minPool = " + minPoolThreads + ", busy = " + busy.get()); 62 88 stopIdleThreads(); 63 89 } … … 66 92 67 93 } catch (Exception e) { 68 94 e.printStackTrace(); 95 logger.error(e); 69 96 } 70 97 } … … 75 102 long now = System.currentTimeMillis(); 76 103 104 int i = 0; 77 105 for (InvocationThread worker : workers) { 106 // Ensure there are at least minThreads available 107 if (workers.size() == minPoolThreads) { 108 break; 109 } 78 110 long lastExec = worker.getLastExecution(); 79 if (worker.isIdle() && (lastExec - now) > keepAliveTime) { 111 System.out.println("last - now = " + (now - lastExec) + " keep alive = " + keepAliveTime); 112 if (worker.isIdle() && (now - lastExec) > keepAliveTime) { 80 113 // Kill thread 81 114 try { 82 115 worker.kill(); 116 workers.remove(i); 83 117 } catch (IOException e) { 84 118 // TODO Auto-generated catch block … … 87 121 88 122 } 123 i++; 89 124 } 125 } 126 127 public void kill() throws IOException { 128 killed = true; 129 for (InvocationThread iThread : workers) { 130 iThread.kill(); 131 } 132 interrupt(); 90 133 } 91 134
Note: See TracChangeset
for help on using the changeset viewer.