Changeset 53 for trunk/src/main/java


Ignore:
Timestamp:
06/20/13 16:57:39 (11 years ago)
Author:
stoda
Message:

Non static broker
TODO: change all test to see whether the new broker configuration works

Location:
trunk/src/main/java/omq
Files:
1 deleted
8 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/client/listener/ResponseListener.java

    r50 r53  
    2828 */
    2929public class ResponseListener extends Thread {
    30         private static final Logger logger = Logger.getLogger(ResponseListener.class.getName());
    31         private static ResponseListener rListener;
     30        private final Logger logger = Logger.getLogger(ResponseListener.class.getName());
    3231
     32        private Broker broker;
    3333        private Channel channel;
    3434        private QueueingConsumer consumer;
     
    4343         * @throws Exception
    4444         */
    45         protected ResponseListener(Properties env) throws Exception {
    46                 this.env = env;
     45        public ResponseListener(Broker broker) throws Exception {
     46                this.broker = broker;
     47                env = broker.getEnvironment();
    4748
    4849                // Init the hashtable (it's concurrent)
    49                 this.results = new Hashtable<String, Map<String, byte[]>>();
     50                results = new Hashtable<String, Map<String, byte[]>>();
    5051
    5152                startRPCQueue();
     
    107108
    108109        private void startRPCQueue() throws Exception {
    109                 channel = Broker.getNewChannel();
     110                channel = broker.getNewChannel();
    110111
    111112                Map<String, Object> args = null;
     
    128129
    129130        /**
    130          * Static function which initializes the ResponseListener
    131          *
    132          * @param env
    133          * @throws Exception
    134          */
    135         public static void init(Properties env) throws Exception {
    136                 if (rListener == null) {
    137                         rListener = new ResponseListener(env);
    138                         rListener.start();
    139                 } else {
    140                         throw new Exception("Cannot init because it already exists");
    141                 }
    142         }
    143 
    144         /**
    145          * Method to retrieve the unique ResponseListener, this function can also
    146          * initialize a ResponseListener using and environment
    147          *
    148          * @param env
    149          * @return unique ResponseListener
    150          * @throws Exception
    151          */
    152         public static ResponseListener getRequestListener(Properties env) throws Exception {
    153                 if (rListener == null) {
    154                         rListener = new ResponseListener(env);
    155                         rListener.start();
    156                 } else {
    157                         // TODO: create a new exception to indicate that a response listener
    158                         // cannot be init
    159                         throw new Exception("Cannot init because it already exists");
    160                 }
    161                 return rListener;
    162         }
    163 
    164         public static boolean isVoid() {
    165                 return rListener == null;
    166         }
    167 
    168         /**
    169          * Method to retrieve the unique ResponseListener
    170          *
    171          * @return
    172          * @throws Exception
    173          */
    174         public static ResponseListener getRequestListener() throws Exception {
    175                 if (rListener == null) {
    176                         throw new Exception("Request listener not initialized");
    177                 }
    178                 return rListener;
    179         }
    180 
    181         /**
    182131         *
    183132         * @param key
     
    186135        public boolean containsKey(String key) {
    187136                return results.containsKey(key);
    188         }
    189 
    190         /**
    191          * This method is used to kill the unique responseListener in the system
    192          *
    193          * @throws Exception
    194          */
    195         public static void stopResponseListner() throws Exception {
    196                 rListener.kill();
    197                 rListener = null;
    198137        }
    199138
  • trunk/src/main/java/omq/client/proxy/Proxymq.java

    r49 r53  
    5151        private String uid;
    5252        private transient String serializerType;
     53        private transient Broker broker;
    5354        private transient ResponseListener rListener;
    5455        private transient EventDispatcher dispatcher;
     56        private transient Serializer serializer;
    5557        // private transient Channel channel;
    5658        private transient Properties env;
     
    8587         * @throws Exception
    8688         */
    87         public Proxymq(String uid, Class<?> clazz, Properties env) throws Exception {
     89        public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception {
    8890                this.uid = uid;
    89                 this.rListener = ResponseListener.getRequestListener();
    90                 this.dispatcher = EventDispatcher.getDispatcher();
     91                this.broker = broker;
     92                rListener = broker.getResponseListener();
     93                dispatcher = broker.getEventDispatcher();
     94                serializer = broker.getSerializer();
    9195
    9296                // TODO what is better to use a new channel or to use the same?
    9397                // this.channel = Broker.getChannel();
    94                 this.env = env;
     98                env = broker.getEnvironment();
    9599
    96100                // set the serializer type
     
    160164
    161165                // Publish the message
    162                 byte[] bytesRequest = Serializer.serialize(serializerType, request);
     166                byte[] bytesRequest = serializer.serialize(serializerType, request);
    163167                // TODO See this
    164168                // channel.basicPublish(exchange, routingkey, props, bytesRequest);
    165                 Broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
     169                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
    166170                // Log.saveLog("Client-Serialize", bytesRequest);
    167171        }
     
    232236                                throw new TimeoutException("Timeout exception time: " + timeout);
    233237                        }
    234                         resp = Serializer.deserializeResponse(results.get(corrId), type);
     238                        resp = serializer.deserializeResponse(results.get(corrId), type);
    235239                        // Log.saveLog("Client-Deserialize", results.get(corrId));
    236240
  • trunk/src/main/java/omq/common/broker/Broker.java

    r50 r53  
    3535        private static final Logger logger = Logger.getLogger(Broker.class.getName());
    3636
    37         private static Connection connection;
    38         private static Channel channel;
    39         private static boolean clientStarted = false;
    40         private static boolean connectionClosed = false;
    41         private static Properties environment = null;
    42         // TODO ask Pedro if it can be only one object in the map (an object can
    43         // have multiple threads in the same broker -see environment-)
    44         private static Map<String, RemoteObject> remoteObjs;
    45 
    46         /**
    47          * Initializes a new Broker with the environment called by reference
    48          *
    49          * @param env
    50          * @throws Exception
    51          */
    52         public static synchronized void initBroker(Properties env) throws Exception {
    53                 if (environment == null) {
    54 
    55                         // Load log4j configuration
    56                         URL log4jResource = Broker.class.getResource("/log4j.xml");
    57                         DOMConfigurator.configure(log4jResource);
    58 
    59                         remoteObjs = new HashMap<String, RemoteObject>();
    60                         environment = env;
    61                         connection = OmqConnectionFactory.getNewConnection(env);
    62                         channel = connection.createChannel();
    63                         addFaultTolerance();
    64                         try {
    65                                 tryConnection(env);
    66                         } catch (Exception e) {
    67                                 channel.close();
    68                                 connection.close();
    69                                 throw new InitBrokerException("The connection didn't work");
    70                         }
    71                 } else {
    72                         logger.error("Broker is already started");
    73                         throw new InitBrokerException("Broker is already started");
    74                 }
    75         }
    76 
    77         public static void stopBroker() throws Exception {
     37        private Connection connection;
     38        private Channel channel;
     39        private ResponseListener responseListener;
     40        private EventDispatcher eventDispatcher;
     41        private Serializer serializer;
     42        private boolean clientStarted = false;
     43        private boolean connectionClosed = false;
     44        private Properties environment = null;
     45        private Map<String, RemoteObject> remoteObjs;
     46
     47        public Broker(Properties env) throws Exception {
     48                // Load log4j configuration
     49                URL log4jResource = Broker.class.getResource("/log4j.xml");
     50                DOMConfigurator.configure(log4jResource);
     51
     52                remoteObjs = new HashMap<String, RemoteObject>();
     53                serializer = new Serializer(env);
     54                environment = env;
     55                connection = OmqConnectionFactory.getNewConnection(env);
     56                channel = connection.createChannel();
     57                addFaultTolerance();
     58                try {
     59                        tryConnection(env);
     60                } catch (Exception e) {
     61                        channel.close();
     62                        connection.close();
     63                        throw new InitBrokerException("The connection didn't work");
     64                }
     65        }
     66
     67        public void stopBroker() throws Exception {
    7868                logger.warn("Stopping broker");
    7969                // Stop the client
    8070                if (clientStarted) {
    81                         ResponseListener.stopResponseListner();
    82                         EventDispatcher.stopEventDispatcher();
     71                        responseListener.kill();
     72                        eventDispatcher.kill();
    8373                        Proxymq.stopProxy();
    8474                }
     
    9585                environment = null;
    9686                remoteObjs = null;
    97                 Serializer.removeSerializers();
     87                // Serializer.removeSerializers();
    9888        }
    9989
     
    10292         * @throws Exception
    10393         */
    104         public static Connection getConnection() throws Exception {
     94        public Connection getConnection() throws Exception {
    10595                return connection;
    10696        }
    10797
    108         public static void closeConnection() throws IOException {
     98        public void closeConnection() throws IOException {
    10999                logger.warn("Clossing connection");
    110100                connectionClosed = true;
     
    118108         * @throws Exception
    119109         */
    120         public static Channel getChannel() throws Exception {
     110        public Channel getChannel() throws Exception {
    121111                return channel;
    122112        }
     
    128118         * @throws IOException
    129119         */
    130         public static Channel getNewChannel() throws IOException {
     120        public Channel getNewChannel() throws IOException {
    131121                return connection.createChannel();
    132122        }
    133123
    134124        @SuppressWarnings("unchecked")
    135         public static <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
     125        public <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
    136126                try {
    137127
     
    142132
    143133                        if (!Proxymq.containsProxy(reference)) {
    144                                 Proxymq proxy = new Proxymq(reference, contract, environment);
     134                                Proxymq proxy = new Proxymq(reference, contract, this);
    145135                                Class<?>[] array = { contract };
    146136                                return (T) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
     
    153143        }
    154144
    155         public static void bind(String reference, RemoteObject remote) throws RemoteException {
     145        public void bind(String reference, RemoteObject remote) throws RemoteException {
    156146                try {
    157                         remote.startRemoteObject(reference, environment);
     147                        remote.startRemoteObject(reference, this);
    158148                        remoteObjs.put(reference, remote);
    159149                } catch (Exception e) {
     
    162152        }
    163153
    164         public static void unbind(String reference) throws RemoteException, IOException {
     154        public void unbind(String reference) throws RemoteException, IOException {
    165155                if (remoteObjs.containsKey(reference)) {
    166156                        RemoteObject remote = remoteObjs.get(reference);
     
    183173         * @throws Exception
    184174         */
    185         private static synchronized void initClient(Properties environment) throws Exception {
    186                 if (ResponseListener.isVoid()) {
    187                         ResponseListener.init(environment);
    188                 }
    189                 if (EventDispatcher.isVoid()) {
    190                         EventDispatcher.init(environment);
     175        private synchronized void initClient(Properties environment) throws Exception {
     176                if (responseListener == null) {
     177                        responseListener = new ResponseListener(this);
     178                }
     179                if (eventDispatcher == null) {
     180                        eventDispatcher = new EventDispatcher(this);
    191181                }
    192182        }
     
    199189         * @throws SerializerException
    200190         */
    201         public static void trigger(Event event) throws IOException, SerializerException {
     191        public void trigger(Event event) throws IOException, SerializerException {
    202192                String UID = event.getTopic();
    203193                EventWrapper wrapper = new EventWrapper(event);
     
    205195                channel.exchangeDeclare(UID, "fanout");
    206196
    207                 byte[] bytesResponse = Serializer.serialize(wrapper);
     197                byte[] bytesResponse = serializer.serialize(wrapper);
    208198                channel.basicPublish(UID, "", null, bytesResponse);
    209199
     
    218208         * @throws Exception
    219209         */
    220         public static void tryConnection(Properties env) throws Exception {
     210        public void tryConnection(Properties env) throws Exception {
    221211                Channel channel = connection.createChannel();
    222212                String message = "ping";
     
    252242         * have the listener.
    253243         */
    254         private static void addFaultTolerance() {
     244        private void addFaultTolerance() {
    255245                connection.addShutdownListener(new ShutdownListener() {
    256246                        @Override
     
    287277        }
    288278
    289         public static Properties getEnvironment() {
     279        public Properties getEnvironment() {
    290280                return environment;
    291281        }
    292282
     283        public ResponseListener getResponseListener() {
     284                return responseListener;
     285        }
     286
     287        public EventDispatcher getEventDispatcher() {
     288                return eventDispatcher;
     289        }
     290
     291        public Serializer getSerializer() {
     292                return serializer;
     293        }
    293294}
  • trunk/src/main/java/omq/common/event/EventDispatcher.java

    r49 r53  
    2929public class EventDispatcher extends Thread {
    3030        private static final Logger logger = Logger.getLogger(EventDispatcher.class.getName());
    31         private static EventDispatcher dispatcher;
    3231
     32        private Broker broker;
     33        private Serializer serializer;
    3334        private Map<String, Vector<EventListener>> listeners;
    3435        private Channel channel;
     
    3738        private boolean killed = false;
    3839
    39         private EventDispatcher(Properties env) throws Exception {
    40                 this.env = env;
     40        public EventDispatcher(Broker broker) throws Exception {
     41                this.broker = broker;
     42                env = broker.getEnvironment();
     43                serializer = broker.getSerializer();
    4144
    4245                // Declare the listeners map
     
    4447
    4548                startEventQueue();
    46 
    4749        }
    4850
    4951        private void startEventQueue() throws Exception {
    5052                // Get a new connection and a new channel
    51                 channel = Broker.getNewChannel();
     53                channel = broker.getNewChannel();
    5254
    5355                String event_queue = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
     
    6062        }
    6163
    62         public static void init(Properties env) throws Exception {
    63                 if (dispatcher == null) {
    64                         dispatcher = new EventDispatcher(env);
    65                         dispatcher.start();
    66                 } else {
    67                         throw new Exception("Already initialized");
    68                 }
    69         }
    70 
    71         public static void stopEventDispatcher() throws Exception {
     64        public void kill() throws Exception {
    7265                logger.warn("Stopping EventDispatcher");
    73                 dispatcher.setListeners(null);
    74                 dispatcher.killed = true;
    75                 dispatcher.interrupt();
    76                 dispatcher.channel.close();
    77                 dispatcher = null;
    78         }
    79 
    80         public static EventDispatcher getDispatcher(Properties env) throws Exception {
    81                 if (dispatcher == null) {
    82                         dispatcher = new EventDispatcher(env);
    83                         dispatcher.start();
    84                 }
    85                 return dispatcher;
    86         }
    87 
    88         public static EventDispatcher getDispatcher() throws Exception {
    89                 if (dispatcher == null) {
    90                         throw new Exception("EventDispatcher not initialized");
    91                 }
    92                 return dispatcher;
     66                setListeners(null);
     67                killed = true;
     68                interrupt();
     69                channel.close();
    9370        }
    9471
     
    10481
    10582                                // Get the event
    106                                 event = Serializer.deserializeEvent(delivery.getBody());
     83                                event = serializer.deserializeEvent(delivery.getBody());
    10784
    10885                                logger.info("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId());
     
    210187        }
    211188
    212         public static boolean isVoid() {
    213                 return dispatcher == null;
    214         }
    215 
    216189}
  • trunk/src/main/java/omq/common/util/Serializer.java

    r49 r53  
    44import java.util.Properties;
    55
    6 import org.apache.log4j.Logger;
    7 
    8 import omq.common.broker.Broker;
    96import omq.common.event.Event;
    107import omq.common.message.Request;
     
    2320 */
    2421public class Serializer {
    25         private static final Logger logger = Logger.getLogger(Serializer.class.getName());
    26         public static String kryo = "kryo";
    27         public static String java = "java";
    28         public static String gson = "gson";
     22        // private static final Logger logger =
     23        // Logger.getLogger(Serializer.class.getName());
     24        public static final String kryo = "kryo";
     25        public static final String java = "java";
     26        public static final String gson = "gson";
    2927
    3028        // Client serializer
    31         public static ISerializer serializer;
     29        public ISerializer serializer;
    3230
    3331        // Server serializers
    34         private static ISerializer kryoSerializer;
    35         private static ISerializer javaSerializer;
    36         private static ISerializer gsonSerializer;
     32        private ISerializer kryoSerializer;
     33        private ISerializer javaSerializer;
     34        private ISerializer gsonSerializer;
    3735
    38         private static Boolean getEnableCompression() {
    39                 Properties env = Broker.getEnvironment();
     36        private Properties env;
     37
     38        public Serializer(Properties env) {
     39                this.env = env;
     40        }
     41
     42        private Boolean getEnableCompression() {
    4043                return Boolean.valueOf(env.getProperty(ParameterQueue.ENABLECOMPRESSION, "false"));
    4144        }
    4245
    43         public static ISerializer getInstance() throws SerializerException {
     46        public ISerializer getInstance() throws SerializerException {
    4447                if (serializer == null) {
    4548                        try {
    46                                 Properties env = Broker.getEnvironment();
    4749                                String className = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java);
    4850
     
    6062        }
    6163
    62         public static ISerializer getInstance(String type) throws SerializerException {
     64        public ISerializer getInstance(String type) throws SerializerException {
    6365                if (kryo.equals(type)) {
    6466                        if (kryoSerializer == null) {
     
    7981        }
    8082
    81         public static byte[] serialize(String type, Object obj) throws SerializerException {
     83        public byte[] serialize(String type, Object obj) throws SerializerException {
    8284                ISerializer instance = getInstance(type);
    8385
     
    9698
    9799        // TODO: remove this function and think about the event serialization
    98         public static byte[] serialize(Object obj) throws SerializerException {
     100        public byte[] serialize(Object obj) throws SerializerException {
    99101                ISerializer instance = getInstance();
    100102
     
    112114        }
    113115
    114         public static Request deserializeRequest(String type, byte[] bytes, RemoteObject obj) throws SerializerException {
     116        public Request deserializeRequest(String type, byte[] bytes, RemoteObject obj) throws SerializerException {
    115117                ISerializer instance = getInstance(type);
    116118
     
    128130        }
    129131
    130         public static Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException {
     132        public Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException {
    131133                ISerializer instance = getInstance();
    132134
     
    144146        }
    145147
    146         public static Event deserializeEvent(byte[] bytes) throws SerializerException {
     148        public Event deserializeEvent(byte[] bytes) throws SerializerException {
    147149                ISerializer instance = getInstance();
    148150
     
    160162        }
    161163
    162         public static void removeSerializers() {
    163                 logger.warn("Removing serializers");
    164                 serializer = null;
    165                 kryoSerializer = null;
    166                 javaSerializer = null;
    167                 gsonSerializer = null;
    168         }
     164        // public static void removeSerializers() {
     165        // logger.warn("Removing serializers");
     166        // serializer = null;
     167        // kryoSerializer = null;
     168        // javaSerializer = null;
     169        // gsonSerializer = null;
     170        // }
    169171}
  • trunk/src/main/java/omq/server/InvocationThread.java

    r49 r53  
    2323        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
    2424        private RemoteObject obj;
     25        private transient Serializer serializer;
    2526        private BlockingQueue<Delivery> deliveryQueue;
    2627        private boolean killed = false;
    2728
    28         public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue) {
     29        public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue, Serializer serializer) {
    2930                this.obj = obj;
    3031                this.deliveryQueue = deliveryQueue;
     32                this.serializer = serializer;
    3133        }
    3234
     
    4143
    4244                                // Deserialize the json
    43                                 Request request = Serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
     45                                Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
    4446                                // Log.saveLog("Server-Deserialize", delivery.getBody());
    4547
     
    7375                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
    7476
    75                                         byte[] bytesResponse = Serializer.serialize(serializerType, resp);
     77                                        byte[] bytesResponse = serializer.serialize(serializerType, resp);
    7678                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
    7779
  • trunk/src/main/java/omq/server/RemoteObject.java

    r49 r53  
    3939        private String UID;
    4040        private Properties env;
     41        private transient Broker broker;
     42        private transient Serializer serializer;
    4143        private transient RemoteWrapper remoteWrapper;
    4244        private transient Map<String, List<Class<?>>> params;
     
    6062        }
    6163
    62         public void startRemoteObject(String reference, Properties env) throws Exception {
    63                 this.UID = reference;
    64                 this.env = env;
     64        public void startRemoteObject(String reference, Broker broker) throws Exception {
     65                this.broker = broker;
     66                UID = reference;
     67                env = broker.getEnvironment();
     68                serializer = broker.getSerializer();
    6569
    6670                params = new HashMap<String, List<Class<?>>>();
     
    7579                // Get num threads to use
    7680                int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
    77                 remoteWrapper = new RemoteWrapper(this, numThreads);
     81                remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer());
    7882
    7983                startQueues();
     
    130134                EventWrapper wrapper = new EventWrapper(event);
    131135                channel.exchangeDeclare(UID, "fanout");
    132                 channel.basicPublish(UID, "", null, Serializer.serialize(wrapper));
     136                channel.basicPublish(UID, "", null, serializer.serialize(wrapper));
    133137        }
    134138
     
    221225
    222226                // Start channel
    223                 channel = Broker.getNewChannel();
     227                channel = broker.getNewChannel();
    224228
    225229                // Declares and bindings
  • trunk/src/main/java/omq/server/RemoteWrapper.java

    r49 r53  
    44import java.util.concurrent.BlockingQueue;
    55import java.util.concurrent.LinkedBlockingDeque;
     6
     7import omq.common.util.Serializer;
    68
    79import org.apache.log4j.Logger;
     
    2325        private BlockingQueue<Delivery> deliveryQueue;
    2426
    25         public RemoteWrapper(RemoteObject obj, int numThreads) {
     27        public RemoteWrapper(RemoteObject obj, int numThreads, Serializer serializer) {
    2628                this.obj = obj;
    2729                this.numThreads = numThreads;
     
    3234
    3335                for (int i = 0; i < numThreads; i++) {
    34                         InvocationThread thread = new InvocationThread(obj, deliveryQueue);
     36                        InvocationThread thread = new InvocationThread(obj, deliveryQueue, serializer);
    3537                        invocationList.add(thread);
    3638                        thread.start();
Note: See TracChangeset for help on using the changeset viewer.