- Timestamp:
- 10/21/13 15:27:22 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/src/main/java/omq/server/InvocationThread.java
r107 r108 1 1 package omq.server; 2 2 3 import java.io.IOException;4 import java.lang.reflect.InvocationTargetException;5 import java.util.Properties;6 7 import omq.common.broker.Broker;8 import omq.common.message.Request;9 import omq.common.message.Response;10 3 import omq.common.util.ParameterQueue; 11 import omq.common.util.Serializer;12 import omq.exception.OmqException;13 4 import omq.exception.SerializerException; 14 5 15 6 import org.apache.log4j.Logger; 16 7 17 import com.rabbitmq.client.AMQP.BasicProperties;18 import com.rabbitmq.client.Channel;19 8 import com.rabbitmq.client.ConsumerCancelledException; 20 9 import com.rabbitmq.client.QueueingConsumer; … … 28 17 * 29 18 */ 30 public class InvocationThread extends Thread {19 public class InvocationThread extends AInvocationThread { 31 20 32 21 private static final Logger logger = Logger.getLogger(InvocationThread.class.getName()); 33 private static final String multi = "multi#";34 22 35 23 // RemoteObject 36 private RemoteObject obj;37 private String reference;38 private String UID;39 private Properties env;40 24 private boolean idle; 41 25 private long lastExec; 42 26 43 private RemoteThreadPool pool;44 45 // Broker46 private Broker broker;47 private Serializer serializer;48 49 // Consumer50 private Channel channel;51 private QueueingConsumer consumer;52 private String multiQueue;53 private boolean killed = false;54 55 27 public InvocationThread(RemoteObject obj) throws Exception { 56 this.obj = obj; 57 this.UID = obj.getUID(); 58 this.reference = obj.getRef(); 59 this.env = obj.getEnv(); 60 this.broker = obj.getBroker(); 61 this.pool = obj.getPool(); 62 this.serializer = broker.getSerializer(); 28 super(obj); 63 29 this.lastExec = 0; 64 30 this.idle = true; 65 }66 67 @Override68 public synchronized void start() {69 try {70 startQueues();71 super.start();72 } catch (Exception e) {73 logger.error("Cannot start a remoteObject", e);74 }75 76 31 } 77 32 … … 87 42 idle = false; 88 43 89 String serializerType = delivery.getProperties().getType(); 90 91 // Deserialize the request 92 Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj); 93 String methodName = request.getMethod(); 94 String requestID = request.getId(); 95 96 logger.debug("Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + ", serializerType: " + serializerType); 97 98 // Invoke the method 99 Object result = null; 100 OmqException error = null; 101 try { 102 result = obj.invokeMethod(request.getMethod(), request.getParams()); 103 } catch (InvocationTargetException e) { 104 Throwable throwable = e.getTargetException(); 105 logger.error("Object: " + obj.getRef() + " at method: " + methodName + ", corrID" + requestID, throwable); 106 error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage()); 107 } catch (NoSuchMethodException e) { 108 logger.error("Object: " + obj.getRef() + " cannot find method: " + methodName); 109 error = new OmqException(e.getClass().getCanonicalName(), e.getMessage()); 110 } 111 112 // Reply if it's necessary 113 if (!request.isAsync()) { 114 Response resp = new Response(request.getId(), obj.getRef(), result, error); 115 116 BasicProperties props = delivery.getProperties(); 117 118 BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build(); 119 120 byte[] bytesResponse = serializer.serialize(serializerType, resp); 121 channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse); 122 logger.debug("Publish sync response -> Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + " replyTo: " 123 + props.getReplyTo()); 124 } 125 126 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 44 executeTask(delivery); 127 45 128 46 // The thread is now idle … … 168 86 * @throws Exception 169 87 */ 170 pr ivatevoid startQueues() throws Exception {88 protected void startQueues() throws Exception { 171 89 // Start channel 172 90 channel = broker.getNewChannel(); … … 190 108 channel.queueBind(queue, exchange, routingKey); 191 109 } 192 logger.info("RemoteObject: " + reference + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "193 + exclusive + ", AutoDelete: " + autoDelete);110 logger.info("RemoteObject: " + reference + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable 111 + ", Exclusive: " + exclusive + ", AutoDelete: " + autoDelete); 194 112 195 113 /* … … 207 125 channel.queueBind(UID, exchange, UID); 208 126 } 127 // TODO logger queue 128 // TODO UID queue should be reference + UID 209 129 } 210 211 /*212 * Multi queue, exclusive per each instance213 */214 215 // Get info about the multiQueue216 String multiExchange = multi + reference;217 // TODO:String multiExchange = multi + exchange + reference;218 multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);219 220 // Multi queue (exclusive queue per remoteObject)221 boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));222 boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));223 boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));224 225 // Declares and bindings226 channel.exchangeDeclare(multiExchange, "fanout");227 if (multiQueue == null) {228 multiQueue = channel.queueDeclare().getQueue();229 } else {230 channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);231 }232 channel.queueBind(multiQueue, multiExchange, "");233 logger.info("RemoteObject: " + reference + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable234 + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);235 130 236 131 /* … … 238 133 */ 239 134 240 // Disable Round Robin behavior 135 // Disable Round Robin behavior 241 136 boolean autoAck = false; 242 137 … … 247 142 consumer = new QueueingConsumer(channel); 248 143 channel.basicConsume(queue, autoAck, consumer); 249 channel.basicConsume(multiQueue, autoAck, consumer);250 144 if (UID != null) { 251 145 channel.basicConsume(UID, autoAck, consumer); 252 146 } 253 }254 255 public void kill() throws IOException {256 logger.info("Killing objectmq: " + reference + " thread id");257 killed = true;258 interrupt();259 channel.close();260 }261 262 public RemoteObject getObj() {263 return obj;264 }265 266 public void setObj(RemoteObject obj) {267 this.obj = obj;268 147 } 269 148
Note: See TracChangeset
for help on using the changeset viewer.