Changeset 96
- Timestamp:
- 10/02/13 17:31:26 (11 years ago)
- Location:
- branches/supervisor/src
- Files:
-
- 4 added
- 1 deleted
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/src/main/java/omq/server/InvocationThread.java
r91 r96 1 1 package omq.server; 2 2 3 import java.io.IOException; 3 4 import java.lang.reflect.InvocationTargetException; 4 import java.util.concurrent.BlockingQueue; 5 5 import java.util.Properties; 6 7 import omq.common.broker.Broker; 6 8 import omq.common.message.Request; 7 9 import omq.common.message.Response; 10 import omq.common.util.ParameterQueue; 8 11 import omq.common.util.Serializer; 9 12 import omq.exception.OmqException; 13 import omq.exception.SerializerException; 10 14 11 15 import org.apache.log4j.Logger; … … 13 17 import com.rabbitmq.client.AMQP.BasicProperties; 14 18 import com.rabbitmq.client.Channel; 19 import com.rabbitmq.client.ConsumerCancelledException; 20 import com.rabbitmq.client.QueueingConsumer; 15 21 import com.rabbitmq.client.QueueingConsumer.Delivery; 22 import com.rabbitmq.client.ShutdownSignalException; 16 23 17 24 /** … … 22 29 */ 23 30 public class InvocationThread extends Thread { 31 24 32 private static final Logger logger = Logger.getLogger(InvocationThread.class.getName()); 33 private static final String multi = "multi#"; 34 35 // RemoteObject 25 36 private RemoteObject obj; 37 private String UID; 38 private Properties env; 39 40 // Broker 41 private Broker broker; 26 42 private Serializer serializer; 27 // private RemoteWrapper wrapper; 28 private BlockingQueue<Delivery> deliveryQueue; 43 44 // Consumer 45 private Channel channel; 46 private QueueingConsumer consumer; 47 private String multiQueue; 29 48 private boolean killed = false; 30 49 31 public InvocationThread(RemoteObject obj, RemoteWrapper wrapper, Serializer serializer){50 public InvocationThread(RemoteObject obj, Broker broker) throws Exception { 32 51 this.obj = obj; 33 // this.wrapper = wrapper; 34 this.deliveryQueue = wrapper.getDeliveryQueue(); 35 this.serializer = serializer; 52 this.UID = obj.getRef(); 53 this.env = obj.getEnv(); 54 this.broker = broker; 55 this.serializer = broker.getSerializer(); 56 } 57 58 @Override 59 public synchronized void start() { 60 try { 61 startQueues(); 62 super.start(); 63 } catch (Exception e) { 64 logger.error("Cannot start a remoteObject", e); 65 } 66 36 67 } 37 68 … … 41 72 try { 42 73 // Get the delivery 43 Delivery delivery = deliveryQueue.take(); 44 45 // // Indicate this thread is not available 46 // wrapper.increaseBusy(); 74 Delivery delivery = consumer.nextDelivery(); 47 75 48 76 String serializerType = delivery.getProperties().getType(); … … 69 97 } 70 98 71 72 Channel channel = obj.getChannel();73 74 75 99 // Reply if it's necessary 76 100 if (!request.isAsync()) { 77 101 Response resp = new Response(request.getId(), obj.getRef(), result, error); 78 79 80 102 81 103 BasicProperties props = delivery.getProperties(); … … 88 110 + props.getReplyTo()); 89 111 } 90 112 91 113 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 92 93 // // Indicate this thread is available94 // wrapper.decreaseBusy();95 114 } catch (InterruptedException i) { 96 115 logger.error(i); 97 killed = true; 116 } catch (ShutdownSignalException e) { 117 logger.error(e); 118 try { 119 if (channel.isOpen()) { 120 channel.close(); 121 } 122 startQueues(); 123 } catch (Exception e1) { 124 try { 125 long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000")); 126 Thread.sleep(milis); 127 } catch (InterruptedException e2) { 128 logger.error(e2); 129 } 130 logger.error(e1); 131 } 132 } catch (ConsumerCancelledException e) { 133 logger.error(e); 134 } catch (SerializerException e) { 135 logger.error(e); 98 136 } catch (Exception e) { 99 logger.error( "Object: " + obj.getRef(),e);137 logger.error(e); 100 138 } 101 139 102 140 } 141 } 142 143 /** 144 * This method starts the queues using the information got in the 145 * environment. 146 * 147 * @throws Exception 148 */ 149 private void startQueues() throws Exception { 150 // Start channel 151 channel = broker.getNewChannel(); 152 153 /* 154 * Default queue, Round Robin behaviour 155 */ 156 157 // Get info about which exchange and queue will use 158 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, ""); 159 String queue = UID; 160 String routingKey = UID; 161 162 // RemoteObject default queue 163 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false")); 164 boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false")); 165 boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false")); 166 167 // Declares and bindings 168 if (!exchange.equalsIgnoreCase("")) { // Default exchange case 169 channel.exchangeDeclare(exchange, "direct"); 170 } 171 channel.queueDeclare(queue, durable, exclusive, autoDelete, null); 172 if (!exchange.equalsIgnoreCase("")) { // Default exchange case 173 channel.queueBind(queue, exchange, routingKey); 174 } 175 logger.info("RemoteObject: " + UID + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: " 176 + exclusive + ", AutoDelete: " + autoDelete); 177 178 /* 179 * Multi queue, exclusive per each instance 180 */ 181 182 // Get info about the multiQueue 183 String multiExchange = multi + UID; 184 // TODO:String multiExchange = multi + exchange + UID; 185 multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME); 186 187 // Multi queue (exclusive queue per remoteObject) 188 boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false")); 189 boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true")); 190 boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true")); 191 192 // Declares and bindings 193 channel.exchangeDeclare(multiExchange, "fanout"); 194 if (multiQueue == null) { 195 multiQueue = channel.queueDeclare().getQueue(); 196 } else { 197 channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null); 198 } 199 channel.queueBind(multiQueue, multiExchange, ""); 200 logger.info("RemoteObject: " + UID + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable 201 + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete); 202 203 /* 204 * Consumer 205 */ 206 207 boolean autoAck = false; 208 209 int prefetchCount = 1; 210 channel.basicQos(prefetchCount); 211 212 // Declare a new consumer 213 consumer = new QueueingConsumer(channel); 214 channel.basicConsume(queue, autoAck, consumer); 215 channel.basicConsume(multiQueue, autoAck, consumer); 216 } 217 218 public void kill() throws IOException { 219 logger.info("Killing objectmq: " + UID + " thread id"); 220 killed = true; 221 interrupt(); 222 channel.close(); 103 223 } 104 224 … … 111 231 } 112 232 113 public BlockingQueue<Delivery> getDeliveryQueue() {114 return deliveryQueue;115 }116 117 public void setDeliveryQueue(BlockingQueue<Delivery> deliveryQueue) {118 this.deliveryQueue = deliveryQueue;119 }120 233 } -
branches/supervisor/src/main/java/omq/server/RemoteObject.java
r92 r96 12 12 import omq.common.broker.Broker; 13 13 import omq.common.util.ParameterQueue; 14 import omq.exception.SerializerException;15 14 16 15 import org.apache.log4j.Logger; 17 18 import com.rabbitmq.client.Channel;19 import com.rabbitmq.client.ConsumerCancelledException;20 import com.rabbitmq.client.QueueingConsumer;21 import com.rabbitmq.client.QueueingConsumer.Delivery;22 import com.rabbitmq.client.ShutdownSignalException;23 16 24 17 /** … … 32 25 * 33 26 */ 34 public abstract class RemoteObject extends Threadimplements Remote {27 public abstract class RemoteObject implements Remote { 35 28 36 29 private static final long serialVersionUID = -1778953938739846450L; 37 private static final String multi = "multi#";38 30 private static final Logger logger = Logger.getLogger(RemoteObject.class.getName()); 39 31 … … 41 33 private Properties env; 42 34 private transient Broker broker; 43 private transient String multiQueue;44 private transient RemoteWrapper remoteWrapper;45 35 private transient Map<String, List<Class<?>>> params; 46 private transient Channel channel; 47 private transient QueueingConsumer consumer; 48 private transient boolean killed = false; 36 private transient List<InvocationThread> invocationList; 49 37 50 38 private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>(); … … 60 48 } 61 49 62 public RemoteObject() {63 }64 65 50 /** 66 51 * This method starts a remoteObject. … … 90 75 // Get num threads to use 91 76 int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1")); 92 this.remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer()); 93 94 startQueues(); 95 96 // Start this listener 97 this.start(); 98 } 99 100 @Override 101 public void run() { 102 while (!killed) { 103 try { 104 Delivery delivery = consumer.nextDelivery(); 105 106 logger.debug(UID + " has received a message, serializer: " + delivery.getProperties().getType()); 107 108 remoteWrapper.notifyDelivery(delivery); 109 } catch (InterruptedException i) { 110 logger.error(i); 111 } catch (ShutdownSignalException e) { 112 logger.error(e); 113 try { 114 if (channel.isOpen()) { 115 channel.close(); 116 } 117 startQueues(); 118 } catch (Exception e1) { 119 try { 120 long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000")); 121 Thread.sleep(milis); 122 } catch (InterruptedException e2) { 123 logger.error(e2); 124 } 125 logger.error(e1); 126 } 127 } catch (ConsumerCancelledException e) { 128 logger.error(e); 129 } catch (SerializerException e) { 130 logger.error(e); 131 } catch (Exception e) { 132 logger.error(e); 133 } 134 } 77 invocationList = new ArrayList<InvocationThread>(numThreads); 78 79 // Start invocation threads 80 for (int i = 0; i < numThreads; i++) { 81 InvocationThread iThread = new InvocationThread(this, broker); 82 invocationList.add(iThread); 83 iThread.start(); 84 } 85 135 86 } 136 87 … … 147 98 */ 148 99 public void kill() throws IOException { 149 logger.warn("Killing objectmq: " + this.getRef()); 150 killed = true; 151 interrupt(); 152 channel.close(); 153 remoteWrapper.stopRemoteWrapper(); 100 logger.info("Killing objectmq: " + this.getRef()); 101 for (InvocationThread iThread : invocationList) { 102 iThread.kill(); 103 } 154 104 } 155 105 … … 250 200 } 251 201 252 public Channel getChannel() {253 return channel;254 }255 256 202 public Broker getBroker() { 257 203 return broker; 258 204 } 259 205 260 /** 261 * This method starts the queues using the information got in the 262 * environment. 263 * 264 * @throws Exception 265 */ 266 private void startQueues() throws Exception { 267 // Start channel 268 channel = broker.getNewChannel(); 269 270 /* 271 * Default queue, Round Robin behaviour 272 */ 273 274 // Get info about which exchange and queue will use 275 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, ""); 276 String queue = UID; 277 String routingKey = UID; 278 279 // RemoteObject default queue 280 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false")); 281 boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false")); 282 boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false")); 283 284 // Declares and bindings 285 if (!exchange.equalsIgnoreCase("")) { // Default exchange case 286 channel.exchangeDeclare(exchange, "direct"); 287 } 288 channel.queueDeclare(queue, durable, exclusive, autoDelete, null); 289 if (!exchange.equalsIgnoreCase("")) { // Default exchange case 290 channel.queueBind(queue, exchange, routingKey); 291 } 292 logger.info("RemoteObject: " + UID + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: " 293 + exclusive + ", AutoDelete: " + autoDelete); 294 295 /* 296 * Multi queue, exclusive per each instance 297 */ 298 299 // Get info about the multiQueue 300 String multiExchange = multi + UID; 301 multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME); 302 303 // Multi queue (exclusive queue per remoteObject) 304 boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false")); 305 boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true")); 306 boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true")); 307 308 // Declares and bindings 309 channel.exchangeDeclare(multiExchange, "fanout"); 310 if (multiQueue == null) { 311 multiQueue = channel.queueDeclare().getQueue(); 312 } else { 313 channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null); 314 } 315 channel.queueBind(multiQueue, multiExchange, ""); 316 logger.info("RemoteObject: " + UID + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable 317 + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete); 318 319 /* 320 * Consumer 321 */ 322 323 boolean autoAck = false; 324 325 //TODO see if this is useless 326 int prefetchCount = 1; 327 channel.basicQos(prefetchCount); 328 329 // Declare a new consumer 330 consumer = new QueueingConsumer(channel); 331 channel.basicConsume(queue, autoAck, consumer); 332 channel.basicConsume(multiQueue, autoAck, consumer); 206 public Properties getEnv() { 207 return env; 333 208 } 334 209 -
branches/supervisor/src/test/java/omq/test/supervisor/SleepImpl.java
r95 r96 16 16 try { 17 17 System.out.println("I'm going to sleep!!!!!!!!" + Thread.currentThread().getId()); 18 Thread.sleep( 1000);18 Thread.sleep(2000); 19 19 } catch (InterruptedException e) { 20 20 e.printStackTrace(); -
branches/supervisor/src/test/java/omq/test/supervisor/SleepTest.java
r95 r96 24 24 env1.setProperty(ParameterQueue.RABBIT_HOST, "127.0.0.1"); 25 25 env1.setProperty(ParameterQueue.RABBIT_PORT, "5672"); 26 env1.setProperty(ParameterQueue.NUM_THREADS, " 1");26 env1.setProperty(ParameterQueue.NUM_THREADS, "4"); 27 27 28 28 Broker broker = new Broker(env1);
Note: See TracChangeset
for help on using the changeset viewer.