package omq.common.broker; import java.io.IOException; import java.net.URL; import java.util.HashMap; import java.util.Map; import java.util.Properties; import omq.Remote; import omq.client.listener.ResponseListener; import omq.client.proxy.Proxymq; import omq.common.event.Event; import omq.common.event.EventDispatcher; import omq.common.event.EventWrapper; import omq.common.util.OmqConnectionFactory; import omq.common.util.ParameterQueue; import omq.common.util.Serializer; import omq.exception.InitBrokerException; import omq.exception.RemoteException; import omq.exception.SerializerException; import omq.server.RemoteObject; import org.apache.log4j.Logger; import org.apache.log4j.xml.DOMConfigurator; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownListener; import com.rabbitmq.client.ShutdownSignalException; public class Broker { private static final Logger logger = Logger.getLogger(Broker.class.getName()); private static Connection connection; private static Channel channel; private static boolean clientStarted = false; private static boolean connectionClosed = false; private static Properties environment = null; // TODO ask Pedro if it can be only one object in the map (an object can // have multiple threads in the same broker -see environment-) private static Map remoteObjs; /** * Initializes a new Broker with the environment called by reference * * @param env * @throws Exception */ public static synchronized void initBroker(Properties env) throws Exception { if (environment == null) { // Load log4j configuration URL log4jResource = Broker.class.getResource("/log4j.xml"); DOMConfigurator.configure(log4jResource); remoteObjs = new HashMap(); environment = env; connection = OmqConnectionFactory.getNewConnection(env); channel = connection.createChannel(); addFaultTolerance(); try { tryConnection(env); } catch (Exception e) { channel.close(); connection.close(); throw new InitBrokerException("The connection didn't work"); } } else { logger.error("Broker is already started"); throw new InitBrokerException("Broker is already started"); } } public static void stopBroker() throws Exception { logger.warn("Stopping broker"); // Stop the client if (clientStarted) { ResponseListener.stopResponseListner(); EventDispatcher.stopEventDispatcher(); Proxymq.stopProxy(); } // Stop all the remote objects working for (String reference : remoteObjs.keySet()) { unbind(reference); } // Close the connection once all the listeners are died closeConnection(); clientStarted = false; connectionClosed = false; environment = null; remoteObjs = null; Serializer.removeSerializers(); } /** * @return Broker's connection * @throws Exception */ public static Connection getConnection() throws Exception { return connection; } public static void closeConnection() throws IOException { logger.warn("Clossing connection"); connectionClosed = true; connection.close(); connectionClosed = false; } /** * * @return Broker's channel * @throws Exception */ public static Channel getChannel() throws Exception { return channel; } /** * Creates a new channel using the Broker's connection * * @return newChannel * @throws IOException */ public static Channel getNewChannel() throws IOException { return connection.createChannel(); } @SuppressWarnings("unchecked") public static T lookup(String reference, Class contract) throws RemoteException { try { if (!clientStarted) { initClient(environment); clientStarted = true; } if (!Proxymq.containsProxy(reference)) { Proxymq proxy = new Proxymq(reference, contract, environment); Class[] array = { contract }; return (T) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy); } return (T) Proxymq.getInstance(reference); } catch (Exception e) { throw new RemoteException(e); } } public static void bind(String reference, RemoteObject remote) throws RemoteException { try { remote.startRemoteObject(reference, environment); remoteObjs.put(reference, remote); } catch (Exception e) { throw new RemoteException(e); } } public static void unbind(String reference) throws RemoteException, IOException { if (remoteObjs.containsKey(reference)) { RemoteObject remote = remoteObjs.get(reference); remote.kill(); } else { throw new RemoteException("The object referenced by 'reference' does not exist in the Broker"); } } public void rebind(String name, Remote obj) throws RemoteException { } /** * This method ensures the client will have only one ResponseListener and * only one EventDispatcher. Both with the same environment. * * @param environment * @throws Exception */ private static synchronized void initClient(Properties environment) throws Exception { if (ResponseListener.isVoid()) { ResponseListener.init(environment); } if (EventDispatcher.isVoid()) { EventDispatcher.init(environment); } } /** * This method sends an event with its information * * @param event * @throws IOException * @throws SerializerException */ public static void trigger(Event event) throws IOException, SerializerException { String UID = event.getTopic(); EventWrapper wrapper = new EventWrapper(event); System.out.println("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId()); channel.exchangeDeclare(UID, "fanout"); byte[] bytesResponse = Serializer.serialize(wrapper); channel.basicPublish(UID, "", null, bytesResponse); // Log.saveLog("Server-Serialize", bytesResponse); } /** * This function is used to send a ping message to see if the connection * works * * @param env * @throws Exception */ public static void tryConnection(Properties env) throws Exception { Channel channel = connection.createChannel(); String message = "ping"; String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping"; String queueName = exchange; String routingKey = "routingKey"; channel.exchangeDeclare(exchange, "direct"); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchange, routingKey); channel.basicPublish(exchange, routingKey, null, message.getBytes()); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); Delivery delivery = consumer.nextDelivery(1000); channel.exchangeDelete(exchange); channel.queueDelete(queueName); channel.close(); if (!message.equalsIgnoreCase(new String(delivery.getBody()))) { throw new IOException("Ping initialitzation has failed"); } } /** * This method adds a ShutdownListener to the Broker's connection. When this * connection falls, a new connection will be created and this will also * have the listener. */ private static void addFaultTolerance() { connection.addShutdownListener(new ShutdownListener() { @Override public void shutdownCompleted(ShutdownSignalException cause) { logger.warn("Shutdown message received. Cause: " + cause.getMessage()); if (!connectionClosed) if (cause.isHardError()) { if (connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } try { connection = OmqConnectionFactory.getNewWorkingConnection(environment); channel = connection.createChannel(); addFaultTolerance(); } catch (Exception e) { e.printStackTrace(); } } else { Channel channel = (Channel) cause.getReference(); if (channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }); } public static Properties getEnvironment() { return environment; } }