Ignore:
Timestamp:
06/20/13 16:57:39 (11 years ago)
Author:
stoda
Message:

Non static broker
TODO: change all test to see whether the new broker configuration works

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/common/broker/Broker.java

    r50 r53  
    3535        private static final Logger logger = Logger.getLogger(Broker.class.getName());
    3636
    37         private static Connection connection;
    38         private static Channel channel;
    39         private static boolean clientStarted = false;
    40         private static boolean connectionClosed = false;
    41         private static Properties environment = null;
    42         // TODO ask Pedro if it can be only one object in the map (an object can
    43         // have multiple threads in the same broker -see environment-)
    44         private static Map<String, RemoteObject> remoteObjs;
    45 
    46         /**
    47          * Initializes a new Broker with the environment called by reference
    48          *
    49          * @param env
    50          * @throws Exception
    51          */
    52         public static synchronized void initBroker(Properties env) throws Exception {
    53                 if (environment == null) {
    54 
    55                         // Load log4j configuration
    56                         URL log4jResource = Broker.class.getResource("/log4j.xml");
    57                         DOMConfigurator.configure(log4jResource);
    58 
    59                         remoteObjs = new HashMap<String, RemoteObject>();
    60                         environment = env;
    61                         connection = OmqConnectionFactory.getNewConnection(env);
    62                         channel = connection.createChannel();
    63                         addFaultTolerance();
    64                         try {
    65                                 tryConnection(env);
    66                         } catch (Exception e) {
    67                                 channel.close();
    68                                 connection.close();
    69                                 throw new InitBrokerException("The connection didn't work");
    70                         }
    71                 } else {
    72                         logger.error("Broker is already started");
    73                         throw new InitBrokerException("Broker is already started");
    74                 }
    75         }
    76 
    77         public static void stopBroker() throws Exception {
     37        private Connection connection;
     38        private Channel channel;
     39        private ResponseListener responseListener;
     40        private EventDispatcher eventDispatcher;
     41        private Serializer serializer;
     42        private boolean clientStarted = false;
     43        private boolean connectionClosed = false;
     44        private Properties environment = null;
     45        private Map<String, RemoteObject> remoteObjs;
     46
     47        public Broker(Properties env) throws Exception {
     48                // Load log4j configuration
     49                URL log4jResource = Broker.class.getResource("/log4j.xml");
     50                DOMConfigurator.configure(log4jResource);
     51
     52                remoteObjs = new HashMap<String, RemoteObject>();
     53                serializer = new Serializer(env);
     54                environment = env;
     55                connection = OmqConnectionFactory.getNewConnection(env);
     56                channel = connection.createChannel();
     57                addFaultTolerance();
     58                try {
     59                        tryConnection(env);
     60                } catch (Exception e) {
     61                        channel.close();
     62                        connection.close();
     63                        throw new InitBrokerException("The connection didn't work");
     64                }
     65        }
     66
     67        public void stopBroker() throws Exception {
    7868                logger.warn("Stopping broker");
    7969                // Stop the client
    8070                if (clientStarted) {
    81                         ResponseListener.stopResponseListner();
    82                         EventDispatcher.stopEventDispatcher();
     71                        responseListener.kill();
     72                        eventDispatcher.kill();
    8373                        Proxymq.stopProxy();
    8474                }
     
    9585                environment = null;
    9686                remoteObjs = null;
    97                 Serializer.removeSerializers();
     87                // Serializer.removeSerializers();
    9888        }
    9989
     
    10292         * @throws Exception
    10393         */
    104         public static Connection getConnection() throws Exception {
     94        public Connection getConnection() throws Exception {
    10595                return connection;
    10696        }
    10797
    108         public static void closeConnection() throws IOException {
     98        public void closeConnection() throws IOException {
    10999                logger.warn("Clossing connection");
    110100                connectionClosed = true;
     
    118108         * @throws Exception
    119109         */
    120         public static Channel getChannel() throws Exception {
     110        public Channel getChannel() throws Exception {
    121111                return channel;
    122112        }
     
    128118         * @throws IOException
    129119         */
    130         public static Channel getNewChannel() throws IOException {
     120        public Channel getNewChannel() throws IOException {
    131121                return connection.createChannel();
    132122        }
    133123
    134124        @SuppressWarnings("unchecked")
    135         public static <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
     125        public <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
    136126                try {
    137127
     
    142132
    143133                        if (!Proxymq.containsProxy(reference)) {
    144                                 Proxymq proxy = new Proxymq(reference, contract, environment);
     134                                Proxymq proxy = new Proxymq(reference, contract, this);
    145135                                Class<?>[] array = { contract };
    146136                                return (T) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
     
    153143        }
    154144
    155         public static void bind(String reference, RemoteObject remote) throws RemoteException {
     145        public void bind(String reference, RemoteObject remote) throws RemoteException {
    156146                try {
    157                         remote.startRemoteObject(reference, environment);
     147                        remote.startRemoteObject(reference, this);
    158148                        remoteObjs.put(reference, remote);
    159149                } catch (Exception e) {
     
    162152        }
    163153
    164         public static void unbind(String reference) throws RemoteException, IOException {
     154        public void unbind(String reference) throws RemoteException, IOException {
    165155                if (remoteObjs.containsKey(reference)) {
    166156                        RemoteObject remote = remoteObjs.get(reference);
     
    183173         * @throws Exception
    184174         */
    185         private static synchronized void initClient(Properties environment) throws Exception {
    186                 if (ResponseListener.isVoid()) {
    187                         ResponseListener.init(environment);
    188                 }
    189                 if (EventDispatcher.isVoid()) {
    190                         EventDispatcher.init(environment);
     175        private synchronized void initClient(Properties environment) throws Exception {
     176                if (responseListener == null) {
     177                        responseListener = new ResponseListener(this);
     178                }
     179                if (eventDispatcher == null) {
     180                        eventDispatcher = new EventDispatcher(this);
    191181                }
    192182        }
     
    199189         * @throws SerializerException
    200190         */
    201         public static void trigger(Event event) throws IOException, SerializerException {
     191        public void trigger(Event event) throws IOException, SerializerException {
    202192                String UID = event.getTopic();
    203193                EventWrapper wrapper = new EventWrapper(event);
     
    205195                channel.exchangeDeclare(UID, "fanout");
    206196
    207                 byte[] bytesResponse = Serializer.serialize(wrapper);
     197                byte[] bytesResponse = serializer.serialize(wrapper);
    208198                channel.basicPublish(UID, "", null, bytesResponse);
    209199
     
    218208         * @throws Exception
    219209         */
    220         public static void tryConnection(Properties env) throws Exception {
     210        public void tryConnection(Properties env) throws Exception {
    221211                Channel channel = connection.createChannel();
    222212                String message = "ping";
     
    252242         * have the listener.
    253243         */
    254         private static void addFaultTolerance() {
     244        private void addFaultTolerance() {
    255245                connection.addShutdownListener(new ShutdownListener() {
    256246                        @Override
     
    287277        }
    288278
    289         public static Properties getEnvironment() {
     279        public Properties getEnvironment() {
    290280                return environment;
    291281        }
    292282
     283        public ResponseListener getResponseListener() {
     284                return responseListener;
     285        }
     286
     287        public EventDispatcher getEventDispatcher() {
     288                return eventDispatcher;
     289        }
     290
     291        public Serializer getSerializer() {
     292                return serializer;
     293        }
    293294}
Note: See TracChangeset for help on using the changeset viewer.