Changeset 36


Ignore:
Timestamp:
06/11/13 18:25:36 (11 years ago)
Author:
stoda
Message:

Fault tolerance in the client side added.
Some refactoring in proxymq, broker, environment and the client listeners

Location:
trunk/objectmq
Files:
3 added
5 edited

Legend:

Unmodified
Added
Removed
  • trunk/objectmq/src/omq/client/listener/ResponseListener.java

    r35 r36  
    88
    99import omq.client.proxy.Proxymq;
    10 import omq.common.util.OmqConnectionFactory;
     10import omq.common.broker.Broker;
    1111import omq.common.util.ParameterQueue;
    1212
    1313import com.rabbitmq.client.AMQP.BasicProperties;
    1414import com.rabbitmq.client.Channel;
    15 import com.rabbitmq.client.Connection;
    1615import com.rabbitmq.client.ConsumerCancelledException;
    1716import com.rabbitmq.client.QueueingConsumer;
     
    2928        private static ResponseListener rListener;
    3029
    31         private Connection connection;
    3230        private Channel channel;
    3331        private QueueingConsumer consumer;
    3432        private boolean killed = false;
    3533        private Map<String, Map<String, byte[]>> results;
     34        private Properties env;
    3635
    3736        /**
     
    4241         */
    4342        protected ResponseListener(Properties env) throws Exception {
    44                 connection = OmqConnectionFactory.getNewConnection(env);
    45                 channel = connection.createChannel();
     43                this.env = env;
    4644
    4745                // Init the hashtable (it's concurrent)
    4846                this.results = new Hashtable<String, Map<String, byte[]>>();
    4947
    50                 Map<String, Object> args = null;
    51 
    52                 String reply_queue = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
    53                 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
    54 
    55                 int ttl = Integer.parseInt(env.getProperty(ParameterQueue.MESSAGE_TTL_IN_QUEUES, "-1"));
    56                 if (ttl > 0) {
    57                         args = new HashMap<String, Object>();
    58                         args.put("x-message-ttl", ttl);
    59                 }
    60 
    61                 channel.queueDeclare(reply_queue, durable, false, false, args);
    62 
    63                 // Declare a new consumer
    64                 consumer = new QueueingConsumer(channel);
    65                 channel.basicConsume(reply_queue, true, consumer);
     48                startRPCQueue();
    6649        }
    6750
     
    9881                        } catch (ShutdownSignalException e) {
    9982                                e.printStackTrace();
     83                                try {
     84                                        if (channel.isOpen()) {
     85                                                channel.close();
     86                                        }
     87                                        startRPCQueue();
     88                                } catch (Exception e1) {
     89                                        e1.printStackTrace();
     90                                        try {
     91                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
     92                                                Thread.sleep(milis);
     93                                        } catch (InterruptedException e2) {
     94                                                e2.printStackTrace();
     95                                        }
     96                                }
    10097                        } catch (ConsumerCancelledException e) {
    10198                                e.printStackTrace();
     
    104101                        }
    105102                }
     103        }
     104
     105        private void startRPCQueue() throws Exception {
     106                channel = Broker.getNewChannel();
     107
     108                Map<String, Object> args = null;
     109
     110                String reply_queue = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
     111                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
     112
     113                int ttl = Integer.parseInt(env.getProperty(ParameterQueue.MESSAGE_TTL_IN_QUEUES, "-1"));
     114                if (ttl > 0) {
     115                        args = new HashMap<String, Object>();
     116                        args.put("x-message-ttl", ttl);
     117                }
     118
     119                channel.queueDeclare(reply_queue, durable, false, false, args);
     120
     121                // Declare a new consumer
     122                consumer = new QueueingConsumer(channel);
     123                channel.basicConsume(reply_queue, true, consumer);
    106124        }
    107125
     
    158176        }
    159177
    160         public synchronized Channel getChannel() throws Exception {
    161                 return connection.createChannel();
    162         }
    163 
    164178        /**
    165179         *
     
    190204                killed = true;
    191205                channel.close();
    192                 connection.close();
    193206        }
    194207
  • trunk/objectmq/src/omq/client/proxy/Proxymq.java

    r35 r36  
    1515import omq.client.annotation.SyncMethod;
    1616import omq.client.listener.ResponseListener;
     17import omq.common.broker.Broker;
    1718import omq.common.event.Event;
    1819import omq.common.event.EventDispatcher;
     
    2829
    2930import com.rabbitmq.client.AMQP.BasicProperties;
    30 import com.rabbitmq.client.Channel;
    3131
    3232/**
     
    4848        private transient ResponseListener rListener;
    4949        private transient EventDispatcher dispatcher;
    50         private transient Channel channel;
     50        // private transient Channel channel;
    5151        private transient Properties env;
    5252        private transient Map<String, byte[]> results;
     
    8585                this.dispatcher = EventDispatcher.getDispatcher();
    8686
    87                 this.channel = rListener.getChannel();
     87                // TODO what is better to use a new channel or to use the same?
     88                // this.channel = Broker.getChannel();
    8889                this.env = env;
    8990
     
    9798        @Override
    9899        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
    99                 //long timeStart = (new Date()).getTime();
    100                
     100                // long timeStart = (new Date()).getTime();
     101
    101102                // Local methods only
    102103                String methodName = method.getName();
     
    120121                Request request = createRequest(method, arguments);
    121122
    122                 //Log.saveTimeSendRequestLog("Client-time-request", request.getId(), method.getName(), timeStart);
    123                
     123                // Log.saveTimeSendRequestLog("Client-time-request", request.getId(),
     124                // method.getName(), timeStart);
     125
    124126                Object response = null;
    125127                // Publish the request
     
    128130                        publishAsyncRequest(request);
    129131                } else {
    130                         System.out.println("Publish sync request -> " + request.getId());                       
     132                        System.out.println("Publish sync request -> " + request.getId());
    131133                        response = publishSyncRequest(request, method.getReturnType());
    132                        
    133                         //long timeEnd = (new Date()).getTime();
    134                         //Log.saveTimeSendRequestLog("Client-time-response", request.getId(), method.getName(), timeEnd);
    135                 }
    136                
     134
     135                        // long timeEnd = (new Date()).getTime();
     136                        // Log.saveTimeSendRequestLog("Client-time-response",
     137                        // request.getId(), method.getName(), timeEnd);
     138                }
     139
    137140                return response;
    138141        }
    139142
    140         private void publishMessage(Request request, String replyQueueName) throws IOException, SerializerException {
     143        private void publishMessage(Request request, String replyQueueName) throws Exception {
    141144                String corrId = request.getId();
    142145
     
    150153                // Publish the message
    151154                byte[] bytesRequest = Serializer.serialize(request);
    152                 channel.basicPublish(exchange, routingkey, props, bytesRequest);
    153                 //Log.saveLog("Client-Serialize", bytesRequest);
    154         }
    155 
    156         private void publishAsyncRequest(Request request) throws IOException, SerializerException {
     155                // TODO See this
     156                // channel.basicPublish(exchange, routingkey, props, bytesRequest);
     157                Broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
     158                // Log.saveLog("Client-Serialize", bytesRequest);
     159        }
     160
     161        private void publishAsyncRequest(Request request) throws Exception {
    157162                // Get the environment properties
    158163                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
     
    221226                        }
    222227                        resp = Serializer.deserializeResponse(results.get(corrId), type);
    223                         //Log.saveLog("Client-Deserialize", results.get(corrId));
    224                        
     228                        // Log.saveLog("Client-Deserialize", results.get(corrId));
     229
    225230                        // Remove and indicate the key exists (a hashmap can contain a null
    226231                        // object, using this we'll know whether a response has been
  • trunk/objectmq/src/omq/common/broker/Broker.java

    r34 r36  
    1414import omq.common.util.ParameterQueue;
    1515import omq.common.util.Serializer;
    16 import omq.exception.EnvironmentException;
    1716import omq.exception.InitBrokerException;
    1817import omq.exception.RemoteException;
     
    3332
    3433        public static void initBroker(Properties env) throws Exception {
    35                 try {
    36                         Environment.getEnvironment();
    37                 } catch (EnvironmentException ex) { // environment not set.
     34                if (Environment.isVoid()) {
    3835                        Environment.setEnvironment(env);
    3936                        connection = OmqConnectionFactory.getNewConnection(env);
     
    115112                System.out.println("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());
    116113                channel.exchangeDeclare(UID, "fanout");
    117                
     114
    118115                byte[] bytesResponse = Serializer.serialize(wrapper);
    119116                channel.basicPublish(UID, "", null, bytesResponse);
    120117
    121                 //Log.saveLog("Server-Serialize", bytesResponse);                       
     118                // Log.saveLog("Server-Serialize", bytesResponse);
    122119        }
    123120
  • trunk/objectmq/src/omq/common/event/EventDispatcher.java

    r35 r36  
    4040                listeners = new HashMap<String, Vector<EventListener>>();
    4141
     42                startEventQueue();
     43
     44        }
     45
     46        private void startEventQueue() throws Exception {
    4247                // Get a new connection and a new channel
    4348                channel = Broker.getNewChannel();
     
    112117                                System.out.println("ShutdownSignalException e: " + e);
    113118                                e.printStackTrace();
     119                                try {
     120                                        if (channel.isOpen()) {
     121                                                channel.close();
     122                                        }
     123                                        startEventQueue();
     124                                } catch (Exception e1) {
     125                                        try {
     126                                                long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"));
     127                                                Thread.sleep(milis);
     128                                        } catch (InterruptedException e2) {
     129                                                e2.printStackTrace();
     130                                        }
     131                                        e1.printStackTrace();
     132                                }
    114133                        } catch (ConsumerCancelledException e) {
    115134                                System.out.println("ConsumerCancelledException e: " + e);
  • trunk/objectmq/src/omq/common/util/Environment.java

    r11 r36  
    44
    55import omq.exception.EnvironmentException;
    6 
    76
    87/**
     
    1211 */
    1312public class Environment {
    14         private static Properties env;
     13        private static Properties env = null;
    1514
    1615        /**
     
    3635                env = environment;
    3736        }
     37
     38        public static boolean isVoid() {
     39                return env == null;
     40        }
    3841}
Note: See TracChangeset for help on using the changeset viewer.