Ignore:
Timestamp:
06/20/13 16:57:39 (11 years ago)
Author:
stoda
Message:

Non static broker
TODO: change all test to see whether the new broker configuration works

Location:
trunk/src/main/java/omq/client
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/client/listener/ResponseListener.java

    r50 r53  
    2828 */
    2929public class ResponseListener extends Thread {
    30         private static final Logger logger = Logger.getLogger(ResponseListener.class.getName());
    31         private static ResponseListener rListener;
     30        private final Logger logger = Logger.getLogger(ResponseListener.class.getName());
    3231
     32        private Broker broker;
    3333        private Channel channel;
    3434        private QueueingConsumer consumer;
     
    4343         * @throws Exception
    4444         */
    45         protected ResponseListener(Properties env) throws Exception {
    46                 this.env = env;
     45        public ResponseListener(Broker broker) throws Exception {
     46                this.broker = broker;
     47                env = broker.getEnvironment();
    4748
    4849                // Init the hashtable (it's concurrent)
    49                 this.results = new Hashtable<String, Map<String, byte[]>>();
     50                results = new Hashtable<String, Map<String, byte[]>>();
    5051
    5152                startRPCQueue();
     
    107108
    108109        private void startRPCQueue() throws Exception {
    109                 channel = Broker.getNewChannel();
     110                channel = broker.getNewChannel();
    110111
    111112                Map<String, Object> args = null;
     
    128129
    129130        /**
    130          * Static function which initializes the ResponseListener
    131          *
    132          * @param env
    133          * @throws Exception
    134          */
    135         public static void init(Properties env) throws Exception {
    136                 if (rListener == null) {
    137                         rListener = new ResponseListener(env);
    138                         rListener.start();
    139                 } else {
    140                         throw new Exception("Cannot init because it already exists");
    141                 }
    142         }
    143 
    144         /**
    145          * Method to retrieve the unique ResponseListener, this function can also
    146          * initialize a ResponseListener using and environment
    147          *
    148          * @param env
    149          * @return unique ResponseListener
    150          * @throws Exception
    151          */
    152         public static ResponseListener getRequestListener(Properties env) throws Exception {
    153                 if (rListener == null) {
    154                         rListener = new ResponseListener(env);
    155                         rListener.start();
    156                 } else {
    157                         // TODO: create a new exception to indicate that a response listener
    158                         // cannot be init
    159                         throw new Exception("Cannot init because it already exists");
    160                 }
    161                 return rListener;
    162         }
    163 
    164         public static boolean isVoid() {
    165                 return rListener == null;
    166         }
    167 
    168         /**
    169          * Method to retrieve the unique ResponseListener
    170          *
    171          * @return
    172          * @throws Exception
    173          */
    174         public static ResponseListener getRequestListener() throws Exception {
    175                 if (rListener == null) {
    176                         throw new Exception("Request listener not initialized");
    177                 }
    178                 return rListener;
    179         }
    180 
    181         /**
    182131         *
    183132         * @param key
     
    186135        public boolean containsKey(String key) {
    187136                return results.containsKey(key);
    188         }
    189 
    190         /**
    191          * This method is used to kill the unique responseListener in the system
    192          *
    193          * @throws Exception
    194          */
    195         public static void stopResponseListner() throws Exception {
    196                 rListener.kill();
    197                 rListener = null;
    198137        }
    199138
  • trunk/src/main/java/omq/client/proxy/Proxymq.java

    r49 r53  
    5151        private String uid;
    5252        private transient String serializerType;
     53        private transient Broker broker;
    5354        private transient ResponseListener rListener;
    5455        private transient EventDispatcher dispatcher;
     56        private transient Serializer serializer;
    5557        // private transient Channel channel;
    5658        private transient Properties env;
     
    8587         * @throws Exception
    8688         */
    87         public Proxymq(String uid, Class<?> clazz, Properties env) throws Exception {
     89        public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception {
    8890                this.uid = uid;
    89                 this.rListener = ResponseListener.getRequestListener();
    90                 this.dispatcher = EventDispatcher.getDispatcher();
     91                this.broker = broker;
     92                rListener = broker.getResponseListener();
     93                dispatcher = broker.getEventDispatcher();
     94                serializer = broker.getSerializer();
    9195
    9296                // TODO what is better to use a new channel or to use the same?
    9397                // this.channel = Broker.getChannel();
    94                 this.env = env;
     98                env = broker.getEnvironment();
    9599
    96100                // set the serializer type
     
    160164
    161165                // Publish the message
    162                 byte[] bytesRequest = Serializer.serialize(serializerType, request);
     166                byte[] bytesRequest = serializer.serialize(serializerType, request);
    163167                // TODO See this
    164168                // channel.basicPublish(exchange, routingkey, props, bytesRequest);
    165                 Broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
     169                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
    166170                // Log.saveLog("Client-Serialize", bytesRequest);
    167171        }
     
    232236                                throw new TimeoutException("Timeout exception time: " + timeout);
    233237                        }
    234                         resp = Serializer.deserializeResponse(results.get(corrId), type);
     238                        resp = serializer.deserializeResponse(results.get(corrId), type);
    235239                        // Log.saveLog("Client-Deserialize", results.get(corrId));
    236240
Note: See TracChangeset for help on using the changeset viewer.