package omq.common.broker; import java.io.IOException; import java.lang.reflect.Proxy; import java.net.URL; import java.util.HashMap; import java.util.Hashtable; import java.util.Map; import java.util.Properties; import omq.Remote; import omq.client.listener.ResponseListener; import omq.client.proxy.MultiProxymq; import omq.client.proxy.Proxymq; import omq.common.util.OmqConnectionFactory; import omq.common.util.ParameterQueue; import omq.common.util.Serializer; import omq.exception.AlreadyBoundException; import omq.exception.InitBrokerException; import omq.exception.RemoteException; import omq.server.RemoteObject; import omq.supervisor.Supervisor; import org.apache.log4j.Logger; import org.apache.log4j.xml.DOMConfigurator; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownListener; import com.rabbitmq.client.ShutdownSignalException; /** * A "broker" allows a new connection to a RabbitMQ server. Under this * connection it can have binded object and proxies. * * @author Sergi Toda * */ public class Broker { private static final Logger logger = Logger.getLogger(Broker.class.getName()); private Connection connection; private Channel channel; private ResponseListener responseListener; private Serializer serializer; private boolean clientStarted = false; private boolean connectionClosed = false; private Properties environment = null; private Map remoteObjs; private Map proxies = new Hashtable(); private Map multiProxies = new Hashtable(); // Supervisor private Supervisor supervisor; public Broker(Properties env) throws Exception { // Load log4j configuration URL log4jResource = Broker.class.getResource("/log4j.xml"); DOMConfigurator.configure(log4jResource); remoteObjs = new HashMap(); serializer = new Serializer(env); environment = env; connection = OmqConnectionFactory.getNewConnection(env); channel = connection.createChannel(); addFaultTolerance(); try { tryConnection(env); } catch (Exception e) { channel.close(); connection.close(); throw new InitBrokerException("The connection didn't work"); } } /** * This method stops the broker's connection and all the threads created * * @throws Exception */ public void stopBroker() throws Exception { logger.warn("Stopping broker"); // Stop the client if (clientStarted) { responseListener.kill(); // TODO proxies = null; ?? } // Stop all the remote objects working for (String reference : remoteObjs.keySet()) { unbind(reference); } // Close the connection once all the listeners are died closeConnection(); clientStarted = false; connectionClosed = false; environment = null; remoteObjs = null; } /** * @return Broker's connection * @throws Exception */ public Connection getConnection() throws Exception { return connection; } /** * This method close the broker's connection * * @throws IOException */ public void closeConnection() throws IOException { logger.warn("Clossing connection"); connectionClosed = true; connection.close(); connectionClosed = false; } /** * Return the broker's channel * * @return Broker's channel * @throws Exception */ public Channel getChannel() throws Exception { return channel; } /** * Creates a new channel using the Broker's connection * * @return newChannel * @throws IOException */ public Channel getNewChannel() throws IOException { return connection.createChannel(); } /** * Returns the remote object for specified reference. * * @param reference * - Binding name * @param contract * - Remote Interface * @return newProxy * @throws RemoteException */ @SuppressWarnings("unchecked") public synchronized T lookup(String reference, Class contract) throws RemoteException { try { if (!clientStarted) { initClient(); } if (!proxies.containsKey(reference)) { Proxymq proxy = new Proxymq(reference, contract, this); Class[] array = { contract }; Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy); proxies.put(reference, newProxy); return (T) newProxy; } return (T) proxies.get(reference); } catch (Exception e) { throw new RemoteException(e); } } /** * Returns the remote object for specified reference. This function returns * an special type of proxy, every method invoked will be multi and * asynchronous. * * @param reference * - Binding name * @param contract * - Remote Interface * @return newProxy * @throws RemoteException */ @SuppressWarnings("unchecked") public synchronized T lookupMulti(String reference, Class contract) throws RemoteException { try { if (!multiProxies.containsKey(reference)) { MultiProxymq proxy = new MultiProxymq(reference, contract, this); Class[] array = { contract }; Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy); multiProxies.put(reference, newProxy); return (T) newProxy; } return (T) multiProxies.get(reference); } catch (Exception e) { throw new RemoteException(e); } } /** * Binds the reference to the specified remote object. This function uses * the broker's environment * * @param reference * - Binding name * @param remote * - RemoteObject to bind * @throws RemoteException * If the remote operation failed * @throws AlreadyBoundException * If name is already bound. */ public void bind(String reference, RemoteObject remote) throws RemoteException, AlreadyBoundException { bind(reference, remote, environment); } /** * Binds the reference to the specified remote object. This function uses * the broker's environment * * @param reference * - Binding name * @param remote * - RemoteObject to bind * @param env * - RemoteObject environment. You can set how many threads will * be listen to the reference, the multiqueue name and the * properties of the object queue and multiqueue * @throws RemoteException * If the remote operation failed * @throws AlreadyBoundException * If name is already bound. */ public void bind(String reference, RemoteObject remote, Properties env) throws RemoteException, AlreadyBoundException { if (remoteObjs.containsKey(reference)) { throw new AlreadyBoundException(reference); } // Try to start the remtoeObject listeners try { remote.startRemoteObject(reference, this, env); remoteObjs.put(reference, remote); } catch (Exception e) { throw new RemoteException(e); } } /** * Unbinds a remoteObject from its reference and kills all the threads * created. * * @param reference * - Binding name * @throws RemoteException * If the remote operation failed * @throws IOException * If there are problems while killing the threads */ public void unbind(String reference) throws RemoteException, IOException { if (remoteObjs.containsKey(reference)) { RemoteObject remote = remoteObjs.get(reference); remote.kill(); } else { throw new RemoteException("The object referenced by 'reference' does not exist in the Broker"); } } /** * This method ensures the client will have only one ResponseListener. * * @throws Exception */ private synchronized void initClient() throws Exception { if (responseListener == null) { responseListener = new ResponseListener(this); responseListener.start(); clientStarted = true; } } /** * This function is used to send a ping message to see if the connection * works * * @param env * @throws Exception */ public void tryConnection(Properties env) throws Exception { Channel channel = connection.createChannel(); String message = "ping"; String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping"; String queueName = exchange; String routingKey = "routingKey"; channel.exchangeDeclare(exchange, "direct"); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchange, routingKey); channel.basicPublish(exchange, routingKey, null, message.getBytes()); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); Delivery delivery = consumer.nextDelivery(1000); channel.exchangeDelete(exchange); channel.queueDelete(queueName); channel.close(); if (!message.equalsIgnoreCase(new String(delivery.getBody()))) { throw new IOException("Ping initialitzation has failed"); } } /** * This method adds a ShutdownListener to the Broker's connection. When this * connection falls, a new connection will be created and this will also * have the listener. */ private void addFaultTolerance() { connection.addShutdownListener(new ShutdownListener() { @Override public void shutdownCompleted(ShutdownSignalException cause) { logger.warn("Shutdown message received. Cause: " + cause.getMessage()); if (!connectionClosed) if (cause.isHardError()) { if (connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } try { connection = OmqConnectionFactory.getNewWorkingConnection(environment); channel = connection.createChannel(); addFaultTolerance(); } catch (Exception e) { e.printStackTrace(); } } else { Channel channel = (Channel) cause.getReference(); if (channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }); } public Properties getEnvironment() { return environment; } public ResponseListener getResponseListener() { return responseListener; } public Serializer getSerializer() { return serializer; } public Map getRemoteObjs() { return remoteObjs; } /* * Supervisor */ public void setSupervisor(String supervisorName, String brokerName) throws Exception { // Create a RemoteBrokerImpl bind(brokerName, new RemoteBrokerImpl()); // Subscribe broker supervisor = lookup(supervisorName, Supervisor.class); supervisor.subscribe(brokerName); logger.info("Supervisor set: " + supervisorName + ", BrokerName: " + brokerName); } public Supervisor getSupervisor() { return supervisor; } }