Changeset 37


Ignore:
Timestamp:
06/12/13 17:02:52 (11 years ago)
Author:
stoda
Message:

Stop broker working and unbind too.

Location:
trunk/objectmq
Files:
8 added
1 deleted
14 edited
2 copied
2 moved

Legend:

Unmodified
Added
Removed
  • trunk/objectmq/src/omq/common/broker/Broker.java

    r36 r37  
    22
    33import java.io.IOException;
     4import java.util.HashMap;
     5import java.util.Map;
    46import java.util.Properties;
    57
     
    3032        private static Channel channel;
    3133        private static boolean clientStarted = false;
    32 
    33         public static void initBroker(Properties env) throws Exception {
     34        private static boolean connectionClosed = false;
     35        // TODO ask Pedro if it can be only one object in the map (an object can
     36        // have multiple threads in the same broker -see environment-)
     37        private static Map<String, RemoteObject> remoteObjs;
     38
     39        /**
     40         * Initializes a new Broker with the environment called by reference
     41         *
     42         * @param env
     43         * @throws Exception
     44         */
     45        public static synchronized void initBroker(Properties env) throws Exception {
    3446                if (Environment.isVoid()) {
     47                        remoteObjs = new HashMap<String, RemoteObject>();
    3548                        Environment.setEnvironment(env);
    3649                        connection = OmqConnectionFactory.getNewConnection(env);
     
    4457                                throw new InitBrokerException("The connection didn't work");
    4558                        }
    46                 }
    47         }
    48 
    49         // TODO: what happens if the connection is not set
     59                } else {
     60                        throw new InitBrokerException("Broker already started");
     61                }
     62        }
     63
     64        public static void stopBroker() throws Exception {
     65                // Stop the client
     66                if (clientStarted) {
     67                        ResponseListener.stopResponseListner();
     68                        EventDispatcher.stopEventDispatcher();
     69                }
     70                // Stop all the remote objects working
     71                for (String reference : remoteObjs.keySet()) {
     72                        unbind(reference);
     73                }
     74                // Close the connection once all the listeners are died
     75                closeConnection();
     76        }
     77
     78        /**
     79         * @return Broker's connection
     80         * @throws Exception
     81         */
    5082        public static Connection getConnection() throws Exception {
    5183                return connection;
    5284        }
    5385
     86        public static void closeConnection() throws IOException {
     87                connectionClosed = true;
     88                connection.close();
     89        }
     90
     91        /**
     92         *
     93         * @return Broker's channel
     94         * @throws Exception
     95         */
    5496        public static Channel getChannel() throws Exception {
    5597                return channel;
    5698        }
    5799
     100        /**
     101         * Creates a new channel using the Broker's connection
     102         *
     103         * @return newChannel
     104         * @throws IOException
     105         */
    58106        public static Channel getNewChannel() throws IOException {
    59107                return connection.createChannel();
     
    85133                        Properties environment = Environment.getEnvironment();
    86134                        remote.startRemoteObject(reference, environment);
     135                        remoteObjs.put(reference, remote);
    87136                } catch (Exception e) {
    88137                        throw new RemoteException(e);
     
    90139        }
    91140
    92         public static void unbind(String reference) throws RemoteException {
     141        public static void unbind(String reference) throws RemoteException, IOException {
     142                if (remoteObjs.containsKey(reference)) {
     143                        RemoteObject remote = remoteObjs.get(reference);
     144                        remote.kill();
     145                } else {
     146                        throw new RemoteException("The object referenced by 'reference' does not exist in the Broker");
     147                }
    93148
    94149        }
     
    98153        }
    99154
     155        /**
     156         * This method ensures the client will have only one ResponseListener and
     157         * only one EventDispatcher. Both with the same environment.
     158         *
     159         * @param environment
     160         * @throws Exception
     161         */
    100162        private static synchronized void initClient(Properties environment) throws Exception {
    101163                if (ResponseListener.isVoid()) {
     
    107169        }
    108170
     171        /**
     172         * This method sends an event with its information
     173         *
     174         * @param event
     175         * @throws IOException
     176         * @throws SerializerException
     177         */
    109178        public static void trigger(Event event) throws IOException, SerializerException {
    110179                String UID = event.getTopic();
     
    119188        }
    120189
     190        /**
     191         * This function is used to send a ping message to see if the connection
     192         * works
     193         *
     194         * @param env
     195         * @throws Exception
     196         */
    121197        public static void tryConnection(Properties env) throws Exception {
     198                System.out.println("hola");
    122199                Channel channel = connection.createChannel();
    123200                String message = "ping";
     
    148225        }
    149226
     227        /**
     228         * This method adds a ShutdownListener to the Broker's connection. When this
     229         * connection falls, a new connection will be created and this will also
     230         * have the listener.
     231         */
    150232        private static void addFaultTolerance() {
    151233                connection.addShutdownListener(new ShutdownListener() {
    152234                        @Override
    153235                        public void shutdownCompleted(ShutdownSignalException cause) {
    154 
    155                                 if (cause.isHardError()) {
    156                                         if (connection.isOpen()) {
     236                                if (!connectionClosed)
     237                                        if (cause.isHardError()) {
     238                                                if (connection.isOpen()) {
     239                                                        try {
     240                                                                connection.close();
     241                                                        } catch (IOException e) {
     242                                                                e.printStackTrace();
     243                                                        }
     244                                                }
    157245                                                try {
    158                                                         connection.close();
    159                                                 } catch (IOException e) {
     246                                                        Properties env = Environment.getEnvironment();
     247                                                        connection = OmqConnectionFactory.getNewWorkingConnection(env);
     248                                                        channel = connection.createChannel();
     249                                                        addFaultTolerance();
     250                                                } catch (Exception e) {
    160251                                                        e.printStackTrace();
    161252                                                }
    162                                         }
    163                                         try {
    164                                                 Properties env = Environment.getEnvironment();
    165                                                 connection = OmqConnectionFactory.getNewWorkingConnection(env);
    166                                                 channel = connection.createChannel();
    167                                                 addFaultTolerance();
    168                                         } catch (Exception e) {
    169                                                 e.printStackTrace();
    170                                         }
    171                                 } else {
    172                                         Channel channel = (Channel) cause.getReference();
    173                                         if (channel.isOpen()) {
    174                                                 try {
    175                                                         channel.close();
    176                                                 } catch (IOException e) {
    177                                                         e.printStackTrace();
     253                                        } else {
     254                                                Channel channel = (Channel) cause.getReference();
     255                                                if (channel.isOpen()) {
     256                                                        try {
     257                                                                channel.close();
     258                                                        } catch (IOException e) {
     259                                                                e.printStackTrace();
     260                                                        }
    178261                                                }
    179262                                        }
    180                                 }
    181263                        }
    182264                });
  • trunk/objectmq/src/omq/server/InvocationThread.java

    r35 r37  
    6969                        }
    7070
    71                 }
     71                }System.out.println("Invocation Thread dies!!");
    7272        }
    7373
  • trunk/objectmq/src/omq/server/RemoteObject.java

    r35 r37  
    129129
    130130        public void kill() throws IOException {
     131                killed = true;
    131132                interrupt();
    132                 killed = true;
    133133                channel.close();
    134134                remoteWrapper.stopRemoteWrapper();
  • trunk/objectmq/test/faultToleranceTest/ClientTest.java

    r36 r37  
    1 package clientToleranceTest;
     1package faultToleranceTest;
    22
    33import static org.junit.Assert.assertEquals;
  • trunk/objectmq/test/faultToleranceTest/ServerTest.java

    r36 r37  
    1 package clientToleranceTest;
     1package faultToleranceTest;
    22
    33import java.util.Properties;
  • trunk/objectmq/test/multiThreadTest/Car.java

    r29 r37  
    1 package test2;
     1package multiThreadTest;
    22
    33import java.util.List;
  • trunk/objectmq/test/multiThreadTest/CarImpl.java

    r34 r37  
    1 package test2;
     1package multiThreadTest;
    22
    33import java.util.List;
  • trunk/objectmq/test/multiThreadTest/CarThread.java

    r29 r37  
    1 package test2;
     1package multiThreadTest;
    22
    33import java.util.ArrayList;
  • trunk/objectmq/test/multiThreadTest/ClientTest.java

    r29 r37  
    1 package test2;
     1package multiThreadTest;
    22
    33import java.util.Properties;
  • trunk/objectmq/test/multiThreadTest/Mobile.java

    r29 r37  
    1 package test2;
     1package multiThreadTest;
    22
    33import java.util.List;
  • trunk/objectmq/test/multiThreadTest/MobileImpl.java

    r34 r37  
    1 package test2;
     1package multiThreadTest;
    22
    33import java.util.ArrayList;
  • trunk/objectmq/test/multiThreadTest/MobileThread.java

    r29 r37  
    1 package test2;
     1package multiThreadTest;
    22
    33import omq.common.broker.Broker;
  • trunk/objectmq/test/multiThreadTest/Rim.java

    r28 r37  
    1 package test2;
     1package multiThreadTest;
    22
    33import java.io.Serializable;
  • trunk/objectmq/test/multiThreadTest/ServerTest.java

    r29 r37  
    1 package test2;
     1package multiThreadTest;
    22
    33import java.util.Properties;
  • trunk/objectmq/test/test/Client.java

    r27 r37  
    2525
    2626        @AsyncMethod
    27         public void sendContact(String contact) throws RemoteException;
     27        public void addContact(String contact) throws RemoteException;
    2828}
  • trunk/objectmq/test/test/ClientImpl.java

    r34 r37  
    5353        @Override
    5454        @AsyncMethod
    55         public void sendContact(String contact) throws RemoteException {
     55        public void addContact(String contact) throws RemoteException {
    5656                if (!id.equalsIgnoreCase(contact) && !friendList.containsKey(contact)) {
    5757                        Client client = (Client) Broker.lookup(contact, Client.class);
Note: See TracChangeset for help on using the changeset viewer.