package omq.client.listener; import java.io.IOException; import java.util.HashMap; import java.util.Hashtable; import java.util.Map; import java.util.Properties; import org.apache.log4j.Logger; import omq.client.proxy.Proxymq; import omq.common.broker.Broker; import omq.common.util.ParameterQueue; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownSignalException; /** * Class that inherits from RemoteListener. It's used in the server side. This * class gets the deliveries from the server and stores them into the proxies * * @author Sergi Toda * */ public class ResponseListener extends Thread { private final Logger logger = Logger.getLogger(ResponseListener.class.getName()); private Broker broker; private Channel channel; private QueueingConsumer consumer; private boolean killed = false; private Map> results; private Properties env; /** * Protected constructor used by the singleton pattern * * @param env * @throws Exception */ public ResponseListener(Broker broker) throws Exception { this.broker = broker; env = broker.getEnvironment(); // Init the hashtable (it's concurrent) results = new Hashtable>(); startRPCQueue(); } @Override public void run() { logger.info("ResponseListener started"); Delivery delivery; String uid_request; while (!killed) { try { // Get the delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); // Get the response with its uid uid_request = delivery.getProperties().getCorrelationId(); logger.debug("Response received -> proxy reference: " + props.getAppId() + ", corrId: " + uid_request); // Stores the new response Map proxyResults = results.get(props.getAppId()); // Put the result into the proxy results and notify him synchronized (proxyResults) { // If we haven't received this response before, we store it if (!proxyResults.containsKey(uid_request)) { proxyResults.put(uid_request, delivery.getBody()); proxyResults.notifyAll(); } } } catch (InterruptedException i) { logger.error(i.toString(), i); } catch (ShutdownSignalException e) { logger.error(e.toString(), e); try { if (channel.isOpen()) { channel.close(); } startRPCQueue(); } catch (Exception e1) { e1.printStackTrace(); try { long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000")); Thread.sleep(milis); } catch (InterruptedException e2) { logger.error(e2.toString(), e2); } } } catch (ConsumerCancelledException e) { logger.error(e.toString(), e); } catch (Exception e) { logger.error(e.toString(), e); } } } private void startRPCQueue() throws Exception { channel = broker.getNewChannel(); Map args = null; String reply_queue = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE); boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false")); int ttl = Integer.parseInt(env.getProperty(ParameterQueue.MESSAGE_TTL_IN_QUEUES, "-1")); if (ttl > 0) { args = new HashMap(); args.put("x-message-ttl", ttl); } channel.queueDeclare(reply_queue, durable, false, false, args); logger.info("ResponseListener creating queue: " + reply_queue + ", durable: " + durable + "TTL: " + (ttl > 0 ? ttl : "not set")); // Declare a new consumer consumer = new QueueingConsumer(channel); channel.basicConsume(reply_queue, true, consumer); } /** * * @param key * @return whether the map has the param key */ public boolean containsKey(String key) { return results.containsKey(key); } /** * Interrupt and kill the Thread * * @throws IOException */ public void kill() throws IOException { logger.warn("Killing ResponseListener"); interrupt(); killed = true; channel.close(); } // Revisar això public void registerProxy(Proxymq proxy) { if (!results.containsKey(proxy.getRef())) { results.put(proxy.getRef(), proxy.getResults()); } } }