- Timestamp:
- 10/11/13 13:44:19 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/src/main/java/omq/server/RemoteThreadPool.java
r100 r101 6 6 import java.util.concurrent.atomic.AtomicInteger; 7 7 8 import org.apache.log4j.Logger; 9 8 10 import omq.common.broker.Broker; 9 11 … … 11 13 import com.rabbitmq.client.Channel; 12 14 15 /** 16 * 17 * @author Sergi Toda <sergi.toda@estudiants.urv.cat> 18 * 19 */ 20 13 21 public class RemoteThreadPool extends Thread { 22 private static final Logger logger = Logger.getLogger(RemoteThreadPool.class.getName()); 14 23 private List<InvocationThread> workers; 15 24 private AtomicInteger busy; … … 40 49 public void run() { 41 50 42 // Crear aquí tots els fils? 51 /* 52 * Create and start minPoolThreads 53 */ 54 logger.info("ObjectMQ reference: " + obj.getRef() + ", creating: " + minPoolThreads + ", maxPoolThreads: " + maxPoolThreads + ", refresh time: " 55 + refresh + ", keepAlive: " + keepAliveTime + ", maxMessagesPerThread: " + maxMessagesPerThread); 56 57 for (int i = 0; i < minPoolThreads; i++) { 58 try { 59 InvocationThread iThread = new InvocationThread(obj); 60 workers.add(iThread); 61 iThread.start(); 62 } catch (Exception e) { 63 logger.error("Error while creating pool threads", e); 64 e.printStackTrace(); 65 } 66 } 43 67 44 68 while (!killed) { … … 55 79 if (numWorkers < maxPoolThreads && division >= maxMessagesPerThread) { 56 80 // Create a new thread 57 InvocationThread worker = new InvocationThread(obj, broker); 81 System.out.println("Add worker"); 82 InvocationThread worker = new InvocationThread(obj); 58 83 workers.add(worker); 59 84 worker.start(); 60 85 } else if (numWorkers > minPoolThreads && busy.get() < numWorkers) { 61 86 // Kill idle threads 87 System.out.println("Kill lazy workers, numWorkers = " + numWorkers + ", minPool = " + minPoolThreads + ", busy = " + busy.get()); 62 88 stopIdleThreads(); 63 89 } … … 66 92 67 93 } catch (Exception e) { 68 94 e.printStackTrace(); 95 logger.error(e); 69 96 } 70 97 } … … 75 102 long now = System.currentTimeMillis(); 76 103 104 int i = 0; 77 105 for (InvocationThread worker : workers) { 106 // Ensure there are at least minThreads available 107 if (workers.size() == minPoolThreads) { 108 break; 109 } 78 110 long lastExec = worker.getLastExecution(); 79 if (worker.isIdle() && (lastExec - now) > keepAliveTime) { 111 System.out.println("last - now = " + (now - lastExec) + " keep alive = " + keepAliveTime); 112 if (worker.isIdle() && (now - lastExec) > keepAliveTime) { 80 113 // Kill thread 81 114 try { 82 115 worker.kill(); 116 workers.remove(i); 83 117 } catch (IOException e) { 84 118 // TODO Auto-generated catch block … … 87 121 88 122 } 123 i++; 89 124 } 125 } 126 127 public void kill() throws IOException { 128 killed = true; 129 for (InvocationThread iThread : workers) { 130 iThread.kill(); 131 } 132 interrupt(); 90 133 } 91 134
Note: See TracChangeset
for help on using the changeset viewer.