- Timestamp:
- 10/02/13 17:31:26 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/supervisor/src/main/java/omq/server/RemoteObject.java
r92 r96 12 12 import omq.common.broker.Broker; 13 13 import omq.common.util.ParameterQueue; 14 import omq.exception.SerializerException;15 14 16 15 import org.apache.log4j.Logger; 17 18 import com.rabbitmq.client.Channel;19 import com.rabbitmq.client.ConsumerCancelledException;20 import com.rabbitmq.client.QueueingConsumer;21 import com.rabbitmq.client.QueueingConsumer.Delivery;22 import com.rabbitmq.client.ShutdownSignalException;23 16 24 17 /** … … 32 25 * 33 26 */ 34 public abstract class RemoteObject extends Threadimplements Remote {27 public abstract class RemoteObject implements Remote { 35 28 36 29 private static final long serialVersionUID = -1778953938739846450L; 37 private static final String multi = "multi#";38 30 private static final Logger logger = Logger.getLogger(RemoteObject.class.getName()); 39 31 … … 41 33 private Properties env; 42 34 private transient Broker broker; 43 private transient String multiQueue;44 private transient RemoteWrapper remoteWrapper;45 35 private transient Map<String, List<Class<?>>> params; 46 private transient Channel channel; 47 private transient QueueingConsumer consumer; 48 private transient boolean killed = false; 36 private transient List<InvocationThread> invocationList; 49 37 50 38 private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>(); … … 60 48 } 61 49 62 public RemoteObject() {63 }64 65 50 /** 66 51 * This method starts a remoteObject. … … 90 75 // Get num threads to use 91 76 int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1")); 92 this.remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer()); 93 94 startQueues(); 95 96 // Start this listener 97 this.start(); 98 } 99 100 @Override 101 public void run() { 102 while (!killed) { 103 try { 104 Delivery delivery = consumer.nextDelivery(); 105 106 logger.debug(UID + " has received a message, serializer: " + delivery.getProperties().getType()); 107 108 remoteWrapper.notifyDelivery(delivery); 109 } catch (InterruptedException i) { 110 logger.error(i); 111 } catch (ShutdownSignalException e) { 112 logger.error(e); 113 try { 114 if (channel.isOpen()) { 115 channel.close(); 116 } 117 startQueues(); 118 } catch (Exception e1) { 119 try { 120 long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000")); 121 Thread.sleep(milis); 122 } catch (InterruptedException e2) { 123 logger.error(e2); 124 } 125 logger.error(e1); 126 } 127 } catch (ConsumerCancelledException e) { 128 logger.error(e); 129 } catch (SerializerException e) { 130 logger.error(e); 131 } catch (Exception e) { 132 logger.error(e); 133 } 134 } 77 invocationList = new ArrayList<InvocationThread>(numThreads); 78 79 // Start invocation threads 80 for (int i = 0; i < numThreads; i++) { 81 InvocationThread iThread = new InvocationThread(this, broker); 82 invocationList.add(iThread); 83 iThread.start(); 84 } 85 135 86 } 136 87 … … 147 98 */ 148 99 public void kill() throws IOException { 149 logger.warn("Killing objectmq: " + this.getRef()); 150 killed = true; 151 interrupt(); 152 channel.close(); 153 remoteWrapper.stopRemoteWrapper(); 100 logger.info("Killing objectmq: " + this.getRef()); 101 for (InvocationThread iThread : invocationList) { 102 iThread.kill(); 103 } 154 104 } 155 105 … … 250 200 } 251 201 252 public Channel getChannel() {253 return channel;254 }255 256 202 public Broker getBroker() { 257 203 return broker; 258 204 } 259 205 260 /** 261 * This method starts the queues using the information got in the 262 * environment. 263 * 264 * @throws Exception 265 */ 266 private void startQueues() throws Exception { 267 // Start channel 268 channel = broker.getNewChannel(); 269 270 /* 271 * Default queue, Round Robin behaviour 272 */ 273 274 // Get info about which exchange and queue will use 275 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, ""); 276 String queue = UID; 277 String routingKey = UID; 278 279 // RemoteObject default queue 280 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false")); 281 boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false")); 282 boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false")); 283 284 // Declares and bindings 285 if (!exchange.equalsIgnoreCase("")) { // Default exchange case 286 channel.exchangeDeclare(exchange, "direct"); 287 } 288 channel.queueDeclare(queue, durable, exclusive, autoDelete, null); 289 if (!exchange.equalsIgnoreCase("")) { // Default exchange case 290 channel.queueBind(queue, exchange, routingKey); 291 } 292 logger.info("RemoteObject: " + UID + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: " 293 + exclusive + ", AutoDelete: " + autoDelete); 294 295 /* 296 * Multi queue, exclusive per each instance 297 */ 298 299 // Get info about the multiQueue 300 String multiExchange = multi + UID; 301 multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME); 302 303 // Multi queue (exclusive queue per remoteObject) 304 boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false")); 305 boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true")); 306 boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true")); 307 308 // Declares and bindings 309 channel.exchangeDeclare(multiExchange, "fanout"); 310 if (multiQueue == null) { 311 multiQueue = channel.queueDeclare().getQueue(); 312 } else { 313 channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null); 314 } 315 channel.queueBind(multiQueue, multiExchange, ""); 316 logger.info("RemoteObject: " + UID + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable 317 + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete); 318 319 /* 320 * Consumer 321 */ 322 323 boolean autoAck = false; 324 325 //TODO see if this is useless 326 int prefetchCount = 1; 327 channel.basicQos(prefetchCount); 328 329 // Declare a new consumer 330 consumer = new QueueingConsumer(channel); 331 channel.basicConsume(queue, autoAck, consumer); 332 channel.basicConsume(multiQueue, autoAck, consumer); 206 public Properties getEnv() { 207 return env; 333 208 } 334 209
Note: See TracChangeset
for help on using the changeset viewer.