- Timestamp:
- 10/21/13 15:27:22 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/src/main/java/omq/server/MultiInvocationThread.java
r107 r108 1 1 package omq.server; 2 2 3 public class MultiInvocationThread { 3 import omq.common.util.ParameterQueue; 4 import omq.exception.SerializerException; 5 6 import org.apache.log4j.Logger; 7 8 import com.rabbitmq.client.ConsumerCancelledException; 9 import com.rabbitmq.client.QueueingConsumer; 10 import com.rabbitmq.client.QueueingConsumer.Delivery; 11 import com.rabbitmq.client.ShutdownSignalException; 12 13 public class MultiInvocationThread extends AInvocationThread { 14 15 private static final Logger logger = Logger.getLogger(MultiInvocationThread.class.getName()); 16 private static final String multi = "multi#"; 17 18 // Consumer 19 private String multiQueue; 20 21 public MultiInvocationThread(RemoteObject obj) throws Exception { 22 super(obj); 23 } 24 25 @Override 26 public void run() { 27 while (!killed) { 28 try { 29 // Get the delivery 30 Delivery delivery = consumer.nextDelivery(); 31 // This thread does not need to set busy because it's mandatory 32 // to exist 33 executeTask(delivery); 34 } catch (InterruptedException i) { 35 logger.error(i); 36 } catch (ShutdownSignalException e) { 37 logger.error(e); 38 try { 39 if (channel.isOpen()) { 40 channel.close(); 41 } 42 startQueues(); 43 } catch (Exception e1) { 44 try { 45 long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000")); 46 Thread.sleep(milis); 47 } catch (InterruptedException e2) { 48 logger.error(e2); 49 } 50 logger.error(e1); 51 } 52 } catch (ConsumerCancelledException e) { 53 logger.error(e); 54 } catch (SerializerException e) { 55 logger.error(e); 56 } catch (Exception e) { 57 e.printStackTrace(); 58 logger.error(e); 59 } 60 61 } 62 logger.info("ObjectMQ ('" + obj.getRef() + "') InvocationThread " + Thread.currentThread().getId() + " is killed"); 63 } 64 65 @Override 66 protected void startQueues() throws Exception { 67 // Start channel 68 channel = broker.getNewChannel(); 69 70 /* 71 * Multi queue, exclusive per each instance 72 */ 73 74 // Get info about the multiQueue 75 String multiExchange = multi + reference; 76 // TODO:String multiExchange = multi + exchange + reference; 77 multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME); 78 79 // Multi queue (exclusive queue per remoteObject) 80 boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false")); 81 boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true")); 82 boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true")); 83 84 // Declares and bindings 85 channel.exchangeDeclare(multiExchange, "fanout"); 86 if (multiQueue == null) { 87 multiQueue = channel.queueDeclare().getQueue(); 88 } else { 89 channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null); 90 } 91 channel.queueBind(multiQueue, multiExchange, ""); 92 logger.info("RemoteObject: " + reference + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " 93 + multiDurable + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete); 94 95 /* 96 * Consumer 97 */ 98 99 // Disable Round Robin behavior 100 boolean autoAck = false; 101 102 // Declare a new consumer 103 consumer = new QueueingConsumer(channel); 104 channel.basicConsume(multiQueue, autoAck, consumer); 105 } 4 106 5 107 }
Note: See TracChangeset
for help on using the changeset viewer.