Changeset 108 for branches/supervisor/src
- Timestamp:
- 10/21/13 15:27:22 (11 years ago)
- Location:
- branches/supervisor/src/main/java/omq/server
- Files:
-
- 1 added
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/src/main/java/omq/server/InvocationThread.java
r107 r108 1 1 package omq.server; 2 2 3 import java.io.IOException;4 import java.lang.reflect.InvocationTargetException;5 import java.util.Properties;6 7 import omq.common.broker.Broker;8 import omq.common.message.Request;9 import omq.common.message.Response;10 3 import omq.common.util.ParameterQueue; 11 import omq.common.util.Serializer;12 import omq.exception.OmqException;13 4 import omq.exception.SerializerException; 14 5 15 6 import org.apache.log4j.Logger; 16 7 17 import com.rabbitmq.client.AMQP.BasicProperties;18 import com.rabbitmq.client.Channel;19 8 import com.rabbitmq.client.ConsumerCancelledException; 20 9 import com.rabbitmq.client.QueueingConsumer; … … 28 17 * 29 18 */ 30 public class InvocationThread extends Thread {19 public class InvocationThread extends AInvocationThread { 31 20 32 21 private static final Logger logger = Logger.getLogger(InvocationThread.class.getName()); 33 private static final String multi = "multi#";34 22 35 23 // RemoteObject 36 private RemoteObject obj;37 private String reference;38 private String UID;39 private Properties env;40 24 private boolean idle; 41 25 private long lastExec; 42 26 43 private RemoteThreadPool pool;44 45 // Broker46 private Broker broker;47 private Serializer serializer;48 49 // Consumer50 private Channel channel;51 private QueueingConsumer consumer;52 private String multiQueue;53 private boolean killed = false;54 55 27 public InvocationThread(RemoteObject obj) throws Exception { 56 this.obj = obj; 57 this.UID = obj.getUID(); 58 this.reference = obj.getRef(); 59 this.env = obj.getEnv(); 60 this.broker = obj.getBroker(); 61 this.pool = obj.getPool(); 62 this.serializer = broker.getSerializer(); 28 super(obj); 63 29 this.lastExec = 0; 64 30 this.idle = true; 65 }66 67 @Override68 public synchronized void start() {69 try {70 startQueues();71 super.start();72 } catch (Exception e) {73 logger.error("Cannot start a remoteObject", e);74 }75 76 31 } 77 32 … … 87 42 idle = false; 88 43 89 String serializerType = delivery.getProperties().getType(); 90 91 // Deserialize the request 92 Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj); 93 String methodName = request.getMethod(); 94 String requestID = request.getId(); 95 96 logger.debug("Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + ", serializerType: " + serializerType); 97 98 // Invoke the method 99 Object result = null; 100 OmqException error = null; 101 try { 102 result = obj.invokeMethod(request.getMethod(), request.getParams()); 103 } catch (InvocationTargetException e) { 104 Throwable throwable = e.getTargetException(); 105 logger.error("Object: " + obj.getRef() + " at method: " + methodName + ", corrID" + requestID, throwable); 106 error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage()); 107 } catch (NoSuchMethodException e) { 108 logger.error("Object: " + obj.getRef() + " cannot find method: " + methodName); 109 error = new OmqException(e.getClass().getCanonicalName(), e.getMessage()); 110 } 111 112 // Reply if it's necessary 113 if (!request.isAsync()) { 114 Response resp = new Response(request.getId(), obj.getRef(), result, error); 115 116 BasicProperties props = delivery.getProperties(); 117 118 BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build(); 119 120 byte[] bytesResponse = serializer.serialize(serializerType, resp); 121 channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse); 122 logger.debug("Publish sync response -> Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + " replyTo: " 123 + props.getReplyTo()); 124 } 125 126 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 44 executeTask(delivery); 127 45 128 46 // The thread is now idle … … 168 86 * @throws Exception 169 87 */ 170 pr ivatevoid startQueues() throws Exception {88 protected void startQueues() throws Exception { 171 89 // Start channel 172 90 channel = broker.getNewChannel(); … … 190 108 channel.queueBind(queue, exchange, routingKey); 191 109 } 192 logger.info("RemoteObject: " + reference + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "193 + exclusive + ", AutoDelete: " + autoDelete);110 logger.info("RemoteObject: " + reference + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable 111 + ", Exclusive: " + exclusive + ", AutoDelete: " + autoDelete); 194 112 195 113 /* … … 207 125 channel.queueBind(UID, exchange, UID); 208 126 } 127 // TODO logger queue 128 // TODO UID queue should be reference + UID 209 129 } 210 211 /*212 * Multi queue, exclusive per each instance213 */214 215 // Get info about the multiQueue216 String multiExchange = multi + reference;217 // TODO:String multiExchange = multi + exchange + reference;218 multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);219 220 // Multi queue (exclusive queue per remoteObject)221 boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));222 boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));223 boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));224 225 // Declares and bindings226 channel.exchangeDeclare(multiExchange, "fanout");227 if (multiQueue == null) {228 multiQueue = channel.queueDeclare().getQueue();229 } else {230 channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);231 }232 channel.queueBind(multiQueue, multiExchange, "");233 logger.info("RemoteObject: " + reference + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable234 + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);235 130 236 131 /* … … 238 133 */ 239 134 240 // Disable Round Robin behavior 135 // Disable Round Robin behavior 241 136 boolean autoAck = false; 242 137 … … 247 142 consumer = new QueueingConsumer(channel); 248 143 channel.basicConsume(queue, autoAck, consumer); 249 channel.basicConsume(multiQueue, autoAck, consumer);250 144 if (UID != null) { 251 145 channel.basicConsume(UID, autoAck, consumer); 252 146 } 253 }254 255 public void kill() throws IOException {256 logger.info("Killing objectmq: " + reference + " thread id");257 killed = true;258 interrupt();259 channel.close();260 }261 262 public RemoteObject getObj() {263 return obj;264 }265 266 public void setObj(RemoteObject obj) {267 this.obj = obj;268 147 } 269 148 -
branches/supervisor/src/main/java/omq/server/MultiInvocationThread.java
r107 r108 1 1 package omq.server; 2 2 3 public class MultiInvocationThread { 3 import omq.common.util.ParameterQueue; 4 import omq.exception.SerializerException; 5 6 import org.apache.log4j.Logger; 7 8 import com.rabbitmq.client.ConsumerCancelledException; 9 import com.rabbitmq.client.QueueingConsumer; 10 import com.rabbitmq.client.QueueingConsumer.Delivery; 11 import com.rabbitmq.client.ShutdownSignalException; 12 13 public class MultiInvocationThread extends AInvocationThread { 14 15 private static final Logger logger = Logger.getLogger(MultiInvocationThread.class.getName()); 16 private static final String multi = "multi#"; 17 18 // Consumer 19 private String multiQueue; 20 21 public MultiInvocationThread(RemoteObject obj) throws Exception { 22 super(obj); 23 } 24 25 @Override 26 public void run() { 27 while (!killed) { 28 try { 29 // Get the delivery 30 Delivery delivery = consumer.nextDelivery(); 31 // This thread does not need to set busy because it's mandatory 32 // to exist 33 executeTask(delivery); 34 } catch (InterruptedException i) { 35 logger.error(i); 36 } catch (ShutdownSignalException e) { 37 logger.error(e); 38 try { 39 if (channel.isOpen()) { 40 channel.close(); 41 } 42 startQueues(); 43 } catch (Exception e1) { 44 try { 45 long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000")); 46 Thread.sleep(milis); 47 } catch (InterruptedException e2) { 48 logger.error(e2); 49 } 50 logger.error(e1); 51 } 52 } catch (ConsumerCancelledException e) { 53 logger.error(e); 54 } catch (SerializerException e) { 55 logger.error(e); 56 } catch (Exception e) { 57 e.printStackTrace(); 58 logger.error(e); 59 } 60 61 } 62 logger.info("ObjectMQ ('" + obj.getRef() + "') InvocationThread " + Thread.currentThread().getId() + " is killed"); 63 } 64 65 @Override 66 protected void startQueues() throws Exception { 67 // Start channel 68 channel = broker.getNewChannel(); 69 70 /* 71 * Multi queue, exclusive per each instance 72 */ 73 74 // Get info about the multiQueue 75 String multiExchange = multi + reference; 76 // TODO:String multiExchange = multi + exchange + reference; 77 multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME); 78 79 // Multi queue (exclusive queue per remoteObject) 80 boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false")); 81 boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true")); 82 boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true")); 83 84 // Declares and bindings 85 channel.exchangeDeclare(multiExchange, "fanout"); 86 if (multiQueue == null) { 87 multiQueue = channel.queueDeclare().getQueue(); 88 } else { 89 channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null); 90 } 91 channel.queueBind(multiQueue, multiExchange, ""); 92 logger.info("RemoteObject: " + reference + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " 93 + multiDurable + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete); 94 95 /* 96 * Consumer 97 */ 98 99 // Disable Round Robin behavior 100 boolean autoAck = false; 101 102 // Declare a new consumer 103 consumer = new QueueingConsumer(channel); 104 channel.basicConsume(multiQueue, autoAck, consumer); 105 } 4 106 5 107 } -
branches/supervisor/src/main/java/omq/server/RemoteThreadPool.java
r102 r108 23 23 private static final Logger logger = Logger.getLogger(RemoteThreadPool.class.getName()); 24 24 private List<InvocationThread> workers; 25 private MultiInvocationThread multiWorker; 25 26 private AtomicInteger busy; 26 27 private int minPoolThreads; … … 34 35 private boolean killed = false; 35 36 36 public RemoteThreadPool(int minPoolThreads, int maxPoolThreads, long refresh, long keepAliveTime, int maxMessagesPerThread, RemoteObject obj, Broker broker) { 37 public RemoteThreadPool(int minPoolThreads, int maxPoolThreads, long refresh, long keepAliveTime, int maxMessagesPerThread, 38 RemoteObject obj, Broker broker) { 37 39 this.minPoolThreads = minPoolThreads; 38 40 this.maxPoolThreads = maxPoolThreads; … … 53 55 * Create and start minPoolThreads 54 56 */ 55 logger.info("ObjectMQ reference: " + obj.getRef() + ", creating: " + minPoolThreads + ", maxPoolThreads: " + maxPoolThreads + ", refresh time: " 56 + refresh + ", keepAlive: " + keepAliveTime + ", maxMessagesPerThread: " + maxMessagesPerThread); 57 logger.info("ObjectMQ reference: " + obj.getRef() + ", creating: " + minPoolThreads + ", maxPoolThreads: " + maxPoolThreads 58 + ", refresh time: " + refresh + ", keepAlive: " + keepAliveTime + ", maxMessagesPerThread: " + maxMessagesPerThread); 59 60 try { 61 multiWorker = new MultiInvocationThread(obj); 62 multiWorker.start(); 63 } catch (Exception e1) { 64 // TODO Auto-generated catch block 65 e1.printStackTrace(); 66 } 57 67 58 68 for (int i = 0; i < minPoolThreads; i++) { … … 86 96 } else if (numWorkers > minPoolThreads && busy.get() < numWorkers) { 87 97 // Kill idle threads 88 System.out.println("Kill lazy workers, numWorkers = " + numWorkers + ", minPool = " + minPoolThreads + ", busy = " + busy.get()); 98 System.out.println("Kill lazy workers, numWorkers = " + numWorkers + ", minPool = " + minPoolThreads + ", busy = " 99 + busy.get()); 89 100 stopIdleThreads(); 90 101 }
Note: See TracChangeset
for help on using the changeset viewer.