Changeset 24


Ignore:
Timestamp:
05/24/13 16:23:16 (11 years ago)
Author:
stoda
Message:

fault tolerance in server if the rabbitmq server falls added

Location:
trunk/objectmq/src/omq
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • trunk/objectmq/src/omq/common/broker/Broker.java

    r22 r24  
    2323import com.rabbitmq.client.Connection;
    2424import com.rabbitmq.client.QueueingConsumer;
     25import com.rabbitmq.client.ShutdownListener;
     26import com.rabbitmq.client.ShutdownSignalException;
    2527import com.rabbitmq.client.QueueingConsumer.Delivery;
    2628
     
    3638                        Environment.setEnvironment(env);
    3739                        connection = OmqConnectionFactory.getNewConnection(env);
     40                        connection.addShutdownListener(new ShutdownListener() {
     41                                @Override
     42                                public void shutdownCompleted(ShutdownSignalException cause) {
     43                                        if (connection.isOpen()) {
     44                                                try {
     45                                                        connection.close();
     46                                                } catch (IOException e) {
     47                                                        e.printStackTrace();
     48                                                }
     49                                        }
     50                                        try {
     51                                                Properties env = Environment.getEnvironment();
     52                                                connection = OmqConnectionFactory.getNewWorkingConnection(env);
     53                                                channel = connection.createChannel();
     54                                        } catch (Exception e) {
     55                                                e.printStackTrace();
     56                                        }
     57                                }
     58                        });
    3859                        channel = connection.createChannel();
    3960                        try {
     
    84105                try {
    85106                        Properties environment = Environment.getEnvironment();
    86                         remote.start(reference, environment);
     107                        remote.startRemoteObject(reference, environment);
    87108                } catch (Exception e) {
    88109                        throw new RemoteException(e);
     
    119140                String message = "ping";
    120141
    121                 String exchange = env.getProperty(ParameterQueue.USER_NAME);
    122                 String queueName = exchange + "ping";
     142                String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping";
     143                String queueName = exchange;
    123144                String routingKey = "routingKey";
    124145
     
    140161
    141162                if (!message.equalsIgnoreCase(new String(delivery.getBody()))) {
    142                         throw new Exception();
     163                        throw new IOException("Ping-pong initialitzation has failed");
    143164                }
    144165        }
     166
    145167}
  • trunk/objectmq/src/omq/common/util/OmqConnectionFactory.java

    r22 r24  
    2323                        connection = getNewConnection(env);
    2424                }
     25        }
     26
     27        public static Connection getNewWorkingConnection(Properties env) throws Exception {
     28                Connection connection = null;
     29                boolean working = false;
     30
     31                while (!working) {
     32                        try {
     33                                connection = getNewConnection(env);
     34                                working = true;
     35                        } catch (Exception e) {
     36                                e.printStackTrace();
     37                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
     38                                Thread.sleep(milis);
     39                        }
     40                }
     41
     42                return connection;
    2543        }
    2644
  • trunk/objectmq/src/omq/common/util/ParameterQueue.java

    r19 r24  
    1414        public static String SERIALIZERNAME = "revo.serializer";
    1515        public static String ENABLECOMPRESSION = "revo.compression";
    16        
     16
    1717        public static String SERVER_HOST = "revo.host";
    1818        public static String SERVER_PORT = "revo.port";
     
    3131        public static String ENABLE_SSL = "revo.enable_ssl";
    3232        public static String DEBUGFILE = "revo.debug_file";
    33        
     33
     34        public static String RETRY_TIME_CONNECTION = "omq.retry_connection";
    3435
    3536        /*
  • trunk/objectmq/src/omq/server/remote/request/RemoteObject.java

    r20 r24  
    3535
    3636        private String UID;
     37        private Properties env;
    3738        private transient RemoteWrapper remoteWrapper;
    3839        private transient Map<String, List<Class<?>>> params;
     
    5657        }
    5758
    58         public void start(String reference, Properties env) throws Exception {
     59        public void startRemoteObject(String reference, Properties env) throws Exception {
    5960                this.UID = reference;
     61                this.env = env;
    6062
    6163                params = new HashMap<String, List<Class<?>>>();
     
    7274                remoteWrapper = new RemoteWrapper(this, numThreads);
    7375
    74                 // Get info about which exchange and queue will use
    75                 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
    76                 String queue = UID;
    77                 String routingKey = UID;
    78 
    79                 // Start channel
    80                 channel = Broker.getNewChannel();
    81 
    82                 // Declares and bindings
    83                 System.out.println("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue);
    84                 channel.exchangeDeclare(exchange, "direct");
    85                 channel.queueDeclare(queue, false, false, false, null);
    86                 channel.queueBind(queue, exchange, routingKey);
    87 
    88                 // Declare the event topic fanout
    89                 System.out.println("RemoteObject: " + UID + " declaring fanout exchange: " + UID);
    90                 channel.exchangeDeclare(UID, "fanout");
    91 
    92                 // Declare a new consumer
    93                 consumer = new QueueingConsumer(channel);
    94                 channel.basicConsume(queue, true, consumer);
     76                startQueues();
    9577
    9678                // Start this listener
     
    10991                        } catch (ShutdownSignalException e) {
    11092                                e.printStackTrace();
     93                                try {
     94                                        if (channel.isOpen()) {
     95                                                channel.close();
     96                                        }
     97                                        startQueues();
     98                                } catch (Exception e1) {
     99                                        try {
     100                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
     101                                                Thread.sleep(milis);
     102                                        } catch (InterruptedException e2) {
     103                                                // TODO Auto-generated catch block
     104                                                e2.printStackTrace();
     105                                        }
     106                                        e1.printStackTrace();
     107                                }
    111108                        } catch (ConsumerCancelledException e) {
    112109                                e.printStackTrace();
     
    209206        }
    210207
     208        private void startQueues() throws Exception {
     209                // Get info about which exchange and queue will use
     210                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
     211                String queue = UID;
     212                String routingKey = UID;
     213
     214                // Start channel
     215                channel = Broker.getNewChannel();
     216
     217                // Declares and bindings
     218                System.out.println("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue);
     219                channel.exchangeDeclare(exchange, "direct");
     220                channel.queueDeclare(queue, false, false, false, null);
     221                channel.queueBind(queue, exchange, routingKey);
     222
     223                // Declare the event topic fanout
     224                System.out.println("RemoteObject: " + UID + " declaring fanout exchange: " + UID);
     225                channel.exchangeDeclare(UID, "fanout");
     226
     227                // Declare a new consumer
     228                consumer = new QueueingConsumer(channel);
     229                channel.basicConsume(queue, true, consumer);
     230        }
     231
    211232        @Override
    212233        public void addListener(EventListener eventListener) throws Exception {
  • trunk/objectmq/src/omq/ztest/calculator/ClientTest.java

    r19 r24  
    2222
    2323                // Set host info of rabbimq (where it is)
    24                 env.setProperty(ParameterQueue.SERVER_HOST, "10.30.239.228");
     24                env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1");
    2525                env.setProperty(ParameterQueue.SERVER_PORT, "5672");
    2626                // env.setProperty(ParameterQueue.SERIALIZERNAME,
  • trunk/objectmq/src/omq/ztest/calculator/ServerTest.java

    r18 r24  
    1616
    1717                // Get host info of rabbimq (where it is)
    18                 env.setProperty(ParameterQueue.SERVER_HOST, "10.30.239.228");
     18                env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1");
    1919                env.setProperty(ParameterQueue.SERVER_PORT, "5672");
    2020                // env.setProperty(ParameterQueue.SERIALIZERNAME,
     
    2525                // Set info about where the message will be sent
    2626                env.setProperty(ParameterQueue.RPC_EXCHANGE, "rpc_exchange");
     27                env.setProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000");
    2728
    2829                calc = new CalculatorImpl();
Note: See TracChangeset for help on using the changeset viewer.