- Timestamp:
- 10/02/13 17:31:26 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/src/main/java/omq/server/InvocationThread.java
r91 r96 1 1 package omq.server; 2 2 3 import java.io.IOException; 3 4 import java.lang.reflect.InvocationTargetException; 4 import java.util.concurrent.BlockingQueue; 5 5 import java.util.Properties; 6 7 import omq.common.broker.Broker; 6 8 import omq.common.message.Request; 7 9 import omq.common.message.Response; 10 import omq.common.util.ParameterQueue; 8 11 import omq.common.util.Serializer; 9 12 import omq.exception.OmqException; 13 import omq.exception.SerializerException; 10 14 11 15 import org.apache.log4j.Logger; … … 13 17 import com.rabbitmq.client.AMQP.BasicProperties; 14 18 import com.rabbitmq.client.Channel; 19 import com.rabbitmq.client.ConsumerCancelledException; 20 import com.rabbitmq.client.QueueingConsumer; 15 21 import com.rabbitmq.client.QueueingConsumer.Delivery; 22 import com.rabbitmq.client.ShutdownSignalException; 16 23 17 24 /** … … 22 29 */ 23 30 public class InvocationThread extends Thread { 31 24 32 private static final Logger logger = Logger.getLogger(InvocationThread.class.getName()); 33 private static final String multi = "multi#"; 34 35 // RemoteObject 25 36 private RemoteObject obj; 37 private String UID; 38 private Properties env; 39 40 // Broker 41 private Broker broker; 26 42 private Serializer serializer; 27 // private RemoteWrapper wrapper; 28 private BlockingQueue<Delivery> deliveryQueue; 43 44 // Consumer 45 private Channel channel; 46 private QueueingConsumer consumer; 47 private String multiQueue; 29 48 private boolean killed = false; 30 49 31 public InvocationThread(RemoteObject obj, RemoteWrapper wrapper, Serializer serializer){50 public InvocationThread(RemoteObject obj, Broker broker) throws Exception { 32 51 this.obj = obj; 33 // this.wrapper = wrapper; 34 this.deliveryQueue = wrapper.getDeliveryQueue(); 35 this.serializer = serializer; 52 this.UID = obj.getRef(); 53 this.env = obj.getEnv(); 54 this.broker = broker; 55 this.serializer = broker.getSerializer(); 56 } 57 58 @Override 59 public synchronized void start() { 60 try { 61 startQueues(); 62 super.start(); 63 } catch (Exception e) { 64 logger.error("Cannot start a remoteObject", e); 65 } 66 36 67 } 37 68 … … 41 72 try { 42 73 // Get the delivery 43 Delivery delivery = deliveryQueue.take(); 44 45 // // Indicate this thread is not available 46 // wrapper.increaseBusy(); 74 Delivery delivery = consumer.nextDelivery(); 47 75 48 76 String serializerType = delivery.getProperties().getType(); … … 69 97 } 70 98 71 72 Channel channel = obj.getChannel();73 74 75 99 // Reply if it's necessary 76 100 if (!request.isAsync()) { 77 101 Response resp = new Response(request.getId(), obj.getRef(), result, error); 78 79 80 102 81 103 BasicProperties props = delivery.getProperties(); … … 88 110 + props.getReplyTo()); 89 111 } 90 112 91 113 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 92 93 // // Indicate this thread is available94 // wrapper.decreaseBusy();95 114 } catch (InterruptedException i) { 96 115 logger.error(i); 97 killed = true; 116 } catch (ShutdownSignalException e) { 117 logger.error(e); 118 try { 119 if (channel.isOpen()) { 120 channel.close(); 121 } 122 startQueues(); 123 } catch (Exception e1) { 124 try { 125 long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000")); 126 Thread.sleep(milis); 127 } catch (InterruptedException e2) { 128 logger.error(e2); 129 } 130 logger.error(e1); 131 } 132 } catch (ConsumerCancelledException e) { 133 logger.error(e); 134 } catch (SerializerException e) { 135 logger.error(e); 98 136 } catch (Exception e) { 99 logger.error( "Object: " + obj.getRef(),e);137 logger.error(e); 100 138 } 101 139 102 140 } 141 } 142 143 /** 144 * This method starts the queues using the information got in the 145 * environment. 146 * 147 * @throws Exception 148 */ 149 private void startQueues() throws Exception { 150 // Start channel 151 channel = broker.getNewChannel(); 152 153 /* 154 * Default queue, Round Robin behaviour 155 */ 156 157 // Get info about which exchange and queue will use 158 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, ""); 159 String queue = UID; 160 String routingKey = UID; 161 162 // RemoteObject default queue 163 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false")); 164 boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false")); 165 boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false")); 166 167 // Declares and bindings 168 if (!exchange.equalsIgnoreCase("")) { // Default exchange case 169 channel.exchangeDeclare(exchange, "direct"); 170 } 171 channel.queueDeclare(queue, durable, exclusive, autoDelete, null); 172 if (!exchange.equalsIgnoreCase("")) { // Default exchange case 173 channel.queueBind(queue, exchange, routingKey); 174 } 175 logger.info("RemoteObject: " + UID + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: " 176 + exclusive + ", AutoDelete: " + autoDelete); 177 178 /* 179 * Multi queue, exclusive per each instance 180 */ 181 182 // Get info about the multiQueue 183 String multiExchange = multi + UID; 184 // TODO:String multiExchange = multi + exchange + UID; 185 multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME); 186 187 // Multi queue (exclusive queue per remoteObject) 188 boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false")); 189 boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true")); 190 boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true")); 191 192 // Declares and bindings 193 channel.exchangeDeclare(multiExchange, "fanout"); 194 if (multiQueue == null) { 195 multiQueue = channel.queueDeclare().getQueue(); 196 } else { 197 channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null); 198 } 199 channel.queueBind(multiQueue, multiExchange, ""); 200 logger.info("RemoteObject: " + UID + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable 201 + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete); 202 203 /* 204 * Consumer 205 */ 206 207 boolean autoAck = false; 208 209 int prefetchCount = 1; 210 channel.basicQos(prefetchCount); 211 212 // Declare a new consumer 213 consumer = new QueueingConsumer(channel); 214 channel.basicConsume(queue, autoAck, consumer); 215 channel.basicConsume(multiQueue, autoAck, consumer); 216 } 217 218 public void kill() throws IOException { 219 logger.info("Killing objectmq: " + UID + " thread id"); 220 killed = true; 221 interrupt(); 222 channel.close(); 103 223 } 104 224 … … 111 231 } 112 232 113 public BlockingQueue<Delivery> getDeliveryQueue() {114 return deliveryQueue;115 }116 117 public void setDeliveryQueue(BlockingQueue<Delivery> deliveryQueue) {118 this.deliveryQueue = deliveryQueue;119 }120 233 }
Note: See TracChangeset
for help on using the changeset viewer.