package omq.client.remote.response; import java.util.Hashtable; import java.util.Map; import java.util.Properties; import omq.client.proxy.Proxymq; import omq.common.remote.RemoteListener; import omq.common.util.ParameterQueue; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownSignalException; /** * Class that inherits from RemoteListener. It's used in the server side. This * class gets the deliveries from the server and stores them into the proxies * * @author Sergi Toda * */ public class ResponseListener extends RemoteListener { private static ResponseListener rListener; private Map> results; /** * Protected constructor used by the singleton pattern * * @param env * @throws Exception */ protected ResponseListener(Properties env) throws Exception { super(env); // Init the hashtable (it's concurrent) this.results = new Hashtable>(); String reply_queue = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE); channel.queueDeclare(reply_queue, false, false, false, null); // Declare a new consumer consumer = new QueueingConsumer(channel); channel.basicConsume(reply_queue, true, consumer); } @Override public void run() { Delivery delivery; String uid_request; while (!killed) { try { // Get the delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); // Get the response with its uid uid_request = delivery.getProperties().getCorrelationId(); System.out.println("Response received -> " + uid_request); // Stores the new response Map proxyResults = results.get(props.getAppId()); // Put the result into the proxy results and notify him synchronized (proxyResults) { // If we haven't received this response before, we store it if (!proxyResults.containsKey(uid_request)) { proxyResults.put(uid_request, delivery.getBody()); proxyResults.notifyAll(); } } } catch (InterruptedException i) { i.printStackTrace(); } catch (ShutdownSignalException e) { e.printStackTrace(); } catch (ConsumerCancelledException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } } /** * Static function which initializes the ResponseListener * * @param env * @throws Exception */ public static void init(Properties env) throws Exception { if (rListener == null) { rListener = new ResponseListener(env); rListener.start(); } else { throw new Exception("Cannot init because it already exists"); } } /** * Method to retrieve the unique ResponseListener, this function can also * initialize a ResponseListener using and environment * * @param env * @return unique ResponseListener * @throws Exception */ public static ResponseListener getRequestListener(Properties env) throws Exception { if (rListener == null) { rListener = new ResponseListener(env); rListener.start(); } else { // TODO: create a new exception to indicate that a response listener // cannot be init throw new Exception("Cannot init because it already exists"); } return rListener; } public static boolean isVoid() { return rListener == null; } /** * Method to retrieve the unique ResponseListener * * @return * @throws Exception */ public static ResponseListener getRequestListener() throws Exception { if (rListener == null) { throw new Exception("Request listener not initialized"); } return rListener; } /** * * @param key * @return whether the map has the param key */ public boolean containsKey(String key) { return results.containsKey(key); } /** * This method is used to kill the unique responseListener in the system * * @throws Exception */ public static void stopResponseListner() throws Exception { rListener.kill(); rListener = null; } // Revisar això public void registerProxy(Proxymq proxy) { if (!results.containsKey(proxy.getRef())) { results.put(proxy.getRef(), proxy.getResults()); } } }