package omq.common.broker; import java.io.IOException; import java.util.Properties; import omq.Remote; import omq.client.proxy.Proxymq; import omq.client.remote.response.ResponseListener; import omq.common.event.Event; import omq.common.event.EventDispatcher; import omq.common.event.EventWrapper; import omq.common.util.Environment; import omq.common.util.OmqConnectionFactory; import omq.common.util.ParameterQueue; import omq.common.util.Serializer; import omq.exception.EnvironmentException; import omq.exception.InitBrokerException; import omq.exception.RemoteException; import omq.exception.SerializerException; import omq.server.remote.request.RemoteObject; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownListener; import com.rabbitmq.client.ShutdownSignalException; public class Broker { private static Connection connection; private static Channel channel; private static boolean clientStarted = false; public static void initBroker(Properties env) throws Exception { try { Environment.getEnvironment(); } catch (EnvironmentException ex) { // environment not set. Environment.setEnvironment(env); connection = OmqConnectionFactory.getNewConnection(env); channel = connection.createChannel(); addFaultTolerance(); try { tryConnection(env); } catch (Exception e) { channel.close(); connection.close(); throw new InitBrokerException("The connection didn't work"); } } } // TODO: what happens if the connection is not set public static Connection getConnection() throws Exception { return connection; } public static Channel getChannel() throws Exception { return channel; } public static Channel getNewChannel() throws IOException { return connection.createChannel(); } public static Remote lookup(String reference, Class contract) throws RemoteException { try { Properties environment = Environment.getEnvironment(); if (!clientStarted) { initClient(environment); clientStarted = true; } if (!Proxymq.containsProxy(reference)) { Proxymq proxy = new Proxymq(reference, contract, environment); Class[] array = { contract }; return (Remote) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy); } return (Remote) Proxymq.getInstance(reference); } catch (Exception e) { throw new RemoteException(e); } } public static void bind(String reference, RemoteObject remote) throws RemoteException { try { Properties environment = Environment.getEnvironment(); remote.startRemoteObject(reference, environment); } catch (Exception e) { throw new RemoteException(e); } } public static void unbind(String reference) throws RemoteException { } public void rebind(String name, Remote obj) throws RemoteException { } private static synchronized void initClient(Properties environment) throws Exception { if (ResponseListener.isVoid()) { ResponseListener.init(environment); } if (EventDispatcher.isVoid()) { EventDispatcher.init(environment); } } public static void trigger(Event event) throws IOException, SerializerException { String UID = event.getTopic(); EventWrapper wrapper = new EventWrapper(event); System.out.println("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId()); channel.exchangeDeclare(UID, "fanout"); byte[] bytesResponse = Serializer.serialize(wrapper); channel.basicPublish(UID, "", null, bytesResponse); //Log.saveLog("Server-Serialize", bytesResponse); } public static void tryConnection(Properties env) throws Exception { Channel channel = connection.createChannel(); String message = "ping"; String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping"; String queueName = exchange; String routingKey = "routingKey"; channel.exchangeDeclare(exchange, "direct"); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchange, routingKey); channel.basicPublish(exchange, routingKey, null, message.getBytes()); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); Delivery delivery = consumer.nextDelivery(1000); channel.exchangeDelete(exchange); channel.queueDelete(queueName); channel.close(); if (!message.equalsIgnoreCase(new String(delivery.getBody()))) { throw new IOException("Ping-pong initialitzation has failed"); } } private static void addFaultTolerance() { connection.addShutdownListener(new ShutdownListener() { @Override public void shutdownCompleted(ShutdownSignalException cause) { if (cause.isHardError()) { if (connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } try { Properties env = Environment.getEnvironment(); connection = OmqConnectionFactory.getNewWorkingConnection(env); channel = connection.createChannel(); addFaultTolerance(); } catch (Exception e) { e.printStackTrace(); } } else { Channel channel = (Channel) cause.getReference(); if (channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }); } }