Ignore:
Timestamp:
06/20/13 16:57:39 (11 years ago)
Author:
stoda
Message:

Non static broker
TODO: change all test to see whether the new broker configuration works

Location:
trunk/src/main/java/omq/common
Files:
1 deleted
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/common/broker/Broker.java

    r50 r53  
    3535        private static final Logger logger = Logger.getLogger(Broker.class.getName());
    3636
    37         private static Connection connection;
    38         private static Channel channel;
    39         private static boolean clientStarted = false;
    40         private static boolean connectionClosed = false;
    41         private static Properties environment = null;
    42         // TODO ask Pedro if it can be only one object in the map (an object can
    43         // have multiple threads in the same broker -see environment-)
    44         private static Map<String, RemoteObject> remoteObjs;
    45 
    46         /**
    47          * Initializes a new Broker with the environment called by reference
    48          *
    49          * @param env
    50          * @throws Exception
    51          */
    52         public static synchronized void initBroker(Properties env) throws Exception {
    53                 if (environment == null) {
    54 
    55                         // Load log4j configuration
    56                         URL log4jResource = Broker.class.getResource("/log4j.xml");
    57                         DOMConfigurator.configure(log4jResource);
    58 
    59                         remoteObjs = new HashMap<String, RemoteObject>();
    60                         environment = env;
    61                         connection = OmqConnectionFactory.getNewConnection(env);
    62                         channel = connection.createChannel();
    63                         addFaultTolerance();
    64                         try {
    65                                 tryConnection(env);
    66                         } catch (Exception e) {
    67                                 channel.close();
    68                                 connection.close();
    69                                 throw new InitBrokerException("The connection didn't work");
    70                         }
    71                 } else {
    72                         logger.error("Broker is already started");
    73                         throw new InitBrokerException("Broker is already started");
    74                 }
    75         }
    76 
    77         public static void stopBroker() throws Exception {
     37        private Connection connection;
     38        private Channel channel;
     39        private ResponseListener responseListener;
     40        private EventDispatcher eventDispatcher;
     41        private Serializer serializer;
     42        private boolean clientStarted = false;
     43        private boolean connectionClosed = false;
     44        private Properties environment = null;
     45        private Map<String, RemoteObject> remoteObjs;
     46
     47        public Broker(Properties env) throws Exception {
     48                // Load log4j configuration
     49                URL log4jResource = Broker.class.getResource("/log4j.xml");
     50                DOMConfigurator.configure(log4jResource);
     51
     52                remoteObjs = new HashMap<String, RemoteObject>();
     53                serializer = new Serializer(env);
     54                environment = env;
     55                connection = OmqConnectionFactory.getNewConnection(env);
     56                channel = connection.createChannel();
     57                addFaultTolerance();
     58                try {
     59                        tryConnection(env);
     60                } catch (Exception e) {
     61                        channel.close();
     62                        connection.close();
     63                        throw new InitBrokerException("The connection didn't work");
     64                }
     65        }
     66
     67        public void stopBroker() throws Exception {
    7868                logger.warn("Stopping broker");
    7969                // Stop the client
    8070                if (clientStarted) {
    81                         ResponseListener.stopResponseListner();
    82                         EventDispatcher.stopEventDispatcher();
     71                        responseListener.kill();
     72                        eventDispatcher.kill();
    8373                        Proxymq.stopProxy();
    8474                }
     
    9585                environment = null;
    9686                remoteObjs = null;
    97                 Serializer.removeSerializers();
     87                // Serializer.removeSerializers();
    9888        }
    9989
     
    10292         * @throws Exception
    10393         */
    104         public static Connection getConnection() throws Exception {
     94        public Connection getConnection() throws Exception {
    10595                return connection;
    10696        }
    10797
    108         public static void closeConnection() throws IOException {
     98        public void closeConnection() throws IOException {
    10999                logger.warn("Clossing connection");
    110100                connectionClosed = true;
     
    118108         * @throws Exception
    119109         */
    120         public static Channel getChannel() throws Exception {
     110        public Channel getChannel() throws Exception {
    121111                return channel;
    122112        }
     
    128118         * @throws IOException
    129119         */
    130         public static Channel getNewChannel() throws IOException {
     120        public Channel getNewChannel() throws IOException {
    131121                return connection.createChannel();
    132122        }
    133123
    134124        @SuppressWarnings("unchecked")
    135         public static <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
     125        public <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
    136126                try {
    137127
     
    142132
    143133                        if (!Proxymq.containsProxy(reference)) {
    144                                 Proxymq proxy = new Proxymq(reference, contract, environment);
     134                                Proxymq proxy = new Proxymq(reference, contract, this);
    145135                                Class<?>[] array = { contract };
    146136                                return (T) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
     
    153143        }
    154144
    155         public static void bind(String reference, RemoteObject remote) throws RemoteException {
     145        public void bind(String reference, RemoteObject remote) throws RemoteException {
    156146                try {
    157                         remote.startRemoteObject(reference, environment);
     147                        remote.startRemoteObject(reference, this);
    158148                        remoteObjs.put(reference, remote);
    159149                } catch (Exception e) {
     
    162152        }
    163153
    164         public static void unbind(String reference) throws RemoteException, IOException {
     154        public void unbind(String reference) throws RemoteException, IOException {
    165155                if (remoteObjs.containsKey(reference)) {
    166156                        RemoteObject remote = remoteObjs.get(reference);
     
    183173         * @throws Exception
    184174         */
    185         private static synchronized void initClient(Properties environment) throws Exception {
    186                 if (ResponseListener.isVoid()) {
    187                         ResponseListener.init(environment);
    188                 }
    189                 if (EventDispatcher.isVoid()) {
    190                         EventDispatcher.init(environment);
     175        private synchronized void initClient(Properties environment) throws Exception {
     176                if (responseListener == null) {
     177                        responseListener = new ResponseListener(this);
     178                }
     179                if (eventDispatcher == null) {
     180                        eventDispatcher = new EventDispatcher(this);
    191181                }
    192182        }
     
    199189         * @throws SerializerException
    200190         */
    201         public static void trigger(Event event) throws IOException, SerializerException {
     191        public void trigger(Event event) throws IOException, SerializerException {
    202192                String UID = event.getTopic();
    203193                EventWrapper wrapper = new EventWrapper(event);
     
    205195                channel.exchangeDeclare(UID, "fanout");
    206196
    207                 byte[] bytesResponse = Serializer.serialize(wrapper);
     197                byte[] bytesResponse = serializer.serialize(wrapper);
    208198                channel.basicPublish(UID, "", null, bytesResponse);
    209199
     
    218208         * @throws Exception
    219209         */
    220         public static void tryConnection(Properties env) throws Exception {
     210        public void tryConnection(Properties env) throws Exception {
    221211                Channel channel = connection.createChannel();
    222212                String message = "ping";
     
    252242         * have the listener.
    253243         */
    254         private static void addFaultTolerance() {
     244        private void addFaultTolerance() {
    255245                connection.addShutdownListener(new ShutdownListener() {
    256246                        @Override
     
    287277        }
    288278
    289         public static Properties getEnvironment() {
     279        public Properties getEnvironment() {
    290280                return environment;
    291281        }
    292282
     283        public ResponseListener getResponseListener() {
     284                return responseListener;
     285        }
     286
     287        public EventDispatcher getEventDispatcher() {
     288                return eventDispatcher;
     289        }
     290
     291        public Serializer getSerializer() {
     292                return serializer;
     293        }
    293294}
  • trunk/src/main/java/omq/common/event/EventDispatcher.java

    r49 r53  
    2929public class EventDispatcher extends Thread {
    3030        private static final Logger logger = Logger.getLogger(EventDispatcher.class.getName());
    31         private static EventDispatcher dispatcher;
    3231
     32        private Broker broker;
     33        private Serializer serializer;
    3334        private Map<String, Vector<EventListener>> listeners;
    3435        private Channel channel;
     
    3738        private boolean killed = false;
    3839
    39         private EventDispatcher(Properties env) throws Exception {
    40                 this.env = env;
     40        public EventDispatcher(Broker broker) throws Exception {
     41                this.broker = broker;
     42                env = broker.getEnvironment();
     43                serializer = broker.getSerializer();
    4144
    4245                // Declare the listeners map
     
    4447
    4548                startEventQueue();
    46 
    4749        }
    4850
    4951        private void startEventQueue() throws Exception {
    5052                // Get a new connection and a new channel
    51                 channel = Broker.getNewChannel();
     53                channel = broker.getNewChannel();
    5254
    5355                String event_queue = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
     
    6062        }
    6163
    62         public static void init(Properties env) throws Exception {
    63                 if (dispatcher == null) {
    64                         dispatcher = new EventDispatcher(env);
    65                         dispatcher.start();
    66                 } else {
    67                         throw new Exception("Already initialized");
    68                 }
    69         }
    70 
    71         public static void stopEventDispatcher() throws Exception {
     64        public void kill() throws Exception {
    7265                logger.warn("Stopping EventDispatcher");
    73                 dispatcher.setListeners(null);
    74                 dispatcher.killed = true;
    75                 dispatcher.interrupt();
    76                 dispatcher.channel.close();
    77                 dispatcher = null;
    78         }
    79 
    80         public static EventDispatcher getDispatcher(Properties env) throws Exception {
    81                 if (dispatcher == null) {
    82                         dispatcher = new EventDispatcher(env);
    83                         dispatcher.start();
    84                 }
    85                 return dispatcher;
    86         }
    87 
    88         public static EventDispatcher getDispatcher() throws Exception {
    89                 if (dispatcher == null) {
    90                         throw new Exception("EventDispatcher not initialized");
    91                 }
    92                 return dispatcher;
     66                setListeners(null);
     67                killed = true;
     68                interrupt();
     69                channel.close();
    9370        }
    9471
     
    10481
    10582                                // Get the event
    106                                 event = Serializer.deserializeEvent(delivery.getBody());
     83                                event = serializer.deserializeEvent(delivery.getBody());
    10784
    10885                                logger.info("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId());
     
    210187        }
    211188
    212         public static boolean isVoid() {
    213                 return dispatcher == null;
    214         }
    215 
    216189}
  • trunk/src/main/java/omq/common/util/Serializer.java

    r49 r53  
    44import java.util.Properties;
    55
    6 import org.apache.log4j.Logger;
    7 
    8 import omq.common.broker.Broker;
    96import omq.common.event.Event;
    107import omq.common.message.Request;
     
    2320 */
    2421public class Serializer {
    25         private static final Logger logger = Logger.getLogger(Serializer.class.getName());
    26         public static String kryo = "kryo";
    27         public static String java = "java";
    28         public static String gson = "gson";
     22        // private static final Logger logger =
     23        // Logger.getLogger(Serializer.class.getName());
     24        public static final String kryo = "kryo";
     25        public static final String java = "java";
     26        public static final String gson = "gson";
    2927
    3028        // Client serializer
    31         public static ISerializer serializer;
     29        public ISerializer serializer;
    3230
    3331        // Server serializers
    34         private static ISerializer kryoSerializer;
    35         private static ISerializer javaSerializer;
    36         private static ISerializer gsonSerializer;
     32        private ISerializer kryoSerializer;
     33        private ISerializer javaSerializer;
     34        private ISerializer gsonSerializer;
    3735
    38         private static Boolean getEnableCompression() {
    39                 Properties env = Broker.getEnvironment();
     36        private Properties env;
     37
     38        public Serializer(Properties env) {
     39                this.env = env;
     40        }
     41
     42        private Boolean getEnableCompression() {
    4043                return Boolean.valueOf(env.getProperty(ParameterQueue.ENABLECOMPRESSION, "false"));
    4144        }
    4245
    43         public static ISerializer getInstance() throws SerializerException {
     46        public ISerializer getInstance() throws SerializerException {
    4447                if (serializer == null) {
    4548                        try {
    46                                 Properties env = Broker.getEnvironment();
    4749                                String className = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java);
    4850
     
    6062        }
    6163
    62         public static ISerializer getInstance(String type) throws SerializerException {
     64        public ISerializer getInstance(String type) throws SerializerException {
    6365                if (kryo.equals(type)) {
    6466                        if (kryoSerializer == null) {
     
    7981        }
    8082
    81         public static byte[] serialize(String type, Object obj) throws SerializerException {
     83        public byte[] serialize(String type, Object obj) throws SerializerException {
    8284                ISerializer instance = getInstance(type);
    8385
     
    9698
    9799        // TODO: remove this function and think about the event serialization
    98         public static byte[] serialize(Object obj) throws SerializerException {
     100        public byte[] serialize(Object obj) throws SerializerException {
    99101                ISerializer instance = getInstance();
    100102
     
    112114        }
    113115
    114         public static Request deserializeRequest(String type, byte[] bytes, RemoteObject obj) throws SerializerException {
     116        public Request deserializeRequest(String type, byte[] bytes, RemoteObject obj) throws SerializerException {
    115117                ISerializer instance = getInstance(type);
    116118
     
    128130        }
    129131
    130         public static Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException {
     132        public Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException {
    131133                ISerializer instance = getInstance();
    132134
     
    144146        }
    145147
    146         public static Event deserializeEvent(byte[] bytes) throws SerializerException {
     148        public Event deserializeEvent(byte[] bytes) throws SerializerException {
    147149                ISerializer instance = getInstance();
    148150
     
    160162        }
    161163
    162         public static void removeSerializers() {
    163                 logger.warn("Removing serializers");
    164                 serializer = null;
    165                 kryoSerializer = null;
    166                 javaSerializer = null;
    167                 gsonSerializer = null;
    168         }
     164        // public static void removeSerializers() {
     165        // logger.warn("Removing serializers");
     166        // serializer = null;
     167        // kryoSerializer = null;
     168        // javaSerializer = null;
     169        // gsonSerializer = null;
     170        // }
    169171}
Note: See TracChangeset for help on using the changeset viewer.