Ignore:
Timestamp:
06/20/13 16:57:39 (11 years ago)
Author:
stoda
Message:

Non static broker
TODO: change all test to see whether the new broker configuration works

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/common/event/EventDispatcher.java

    r49 r53  
    2929public class EventDispatcher extends Thread {
    3030        private static final Logger logger = Logger.getLogger(EventDispatcher.class.getName());
    31         private static EventDispatcher dispatcher;
    3231
     32        private Broker broker;
     33        private Serializer serializer;
    3334        private Map<String, Vector<EventListener>> listeners;
    3435        private Channel channel;
     
    3738        private boolean killed = false;
    3839
    39         private EventDispatcher(Properties env) throws Exception {
    40                 this.env = env;
     40        public EventDispatcher(Broker broker) throws Exception {
     41                this.broker = broker;
     42                env = broker.getEnvironment();
     43                serializer = broker.getSerializer();
    4144
    4245                // Declare the listeners map
     
    4447
    4548                startEventQueue();
    46 
    4749        }
    4850
    4951        private void startEventQueue() throws Exception {
    5052                // Get a new connection and a new channel
    51                 channel = Broker.getNewChannel();
     53                channel = broker.getNewChannel();
    5254
    5355                String event_queue = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
     
    6062        }
    6163
    62         public static void init(Properties env) throws Exception {
    63                 if (dispatcher == null) {
    64                         dispatcher = new EventDispatcher(env);
    65                         dispatcher.start();
    66                 } else {
    67                         throw new Exception("Already initialized");
    68                 }
    69         }
    70 
    71         public static void stopEventDispatcher() throws Exception {
     64        public void kill() throws Exception {
    7265                logger.warn("Stopping EventDispatcher");
    73                 dispatcher.setListeners(null);
    74                 dispatcher.killed = true;
    75                 dispatcher.interrupt();
    76                 dispatcher.channel.close();
    77                 dispatcher = null;
    78         }
    79 
    80         public static EventDispatcher getDispatcher(Properties env) throws Exception {
    81                 if (dispatcher == null) {
    82                         dispatcher = new EventDispatcher(env);
    83                         dispatcher.start();
    84                 }
    85                 return dispatcher;
    86         }
    87 
    88         public static EventDispatcher getDispatcher() throws Exception {
    89                 if (dispatcher == null) {
    90                         throw new Exception("EventDispatcher not initialized");
    91                 }
    92                 return dispatcher;
     66                setListeners(null);
     67                killed = true;
     68                interrupt();
     69                channel.close();
    9370        }
    9471
     
    10481
    10582                                // Get the event
    106                                 event = Serializer.deserializeEvent(delivery.getBody());
     83                                event = serializer.deserializeEvent(delivery.getBody());
    10784
    10885                                logger.info("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId());
     
    210187        }
    211188
    212         public static boolean isVoid() {
    213                 return dispatcher == null;
    214         }
    215 
    216189}
Note: See TracChangeset for help on using the changeset viewer.