Changeset 47 for trunk/src/main/java


Ignore:
Timestamp:
06/18/13 16:51:22 (11 years ago)
Author:
stoda
Message:

Refactoring Environment class - deleted.
StopBroker? problems solved (?)
Server can receive send and receive messages in different formats.
Some tests modified

TODO: finish all the tests, add log4j

Location:
trunk/src/main/java/omq
Files:
1 deleted
5 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/client/proxy/Proxymq.java

    r44 r47  
    4747
    4848        private String uid;
     49        private transient String serializerType;
    4950        private transient ResponseListener rListener;
    5051        private transient EventDispatcher dispatcher;
     
    8990                // this.channel = Broker.getChannel();
    9091                this.env = env;
     92
     93                // set the serializer type
     94                serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java);
    9195
    9296                listeners = new HashMap<String, EventListener<?>>();
     
    150154
    151155                // Add the correlation ID and create a replyTo property
    152                 BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).build();
     156                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType).build();
    153157
    154158                // Publish the message
    155                 byte[] bytesRequest = Serializer.serialize(request);
     159                byte[] bytesRequest = Serializer.serialize(serializerType, request);
    156160                // TODO See this
    157161                // channel.basicPublish(exchange, routingkey, props, bytesRequest);
     
    306310        }
    307311
     312        public static void stopProxy() {
     313                proxies = new HashMap<String, Object>();
     314        }
     315
     316        public static Map<String, Object> getProxies() {
     317                return proxies;
     318        }
     319
     320        public static void setProxies(Map<String, Object> proxies) {
     321                Proxymq.proxies = proxies;
     322        }
     323
    308324        @Override
    309325        public String getRef() {
  • trunk/src/main/java/omq/common/broker/Broker.java

    r44 r47  
    1212import omq.common.event.EventDispatcher;
    1313import omq.common.event.EventWrapper;
    14 import omq.common.util.Environment;
    1514import omq.common.util.OmqConnectionFactory;
    1615import omq.common.util.ParameterQueue;
     
    3332        private static boolean clientStarted = false;
    3433        private static boolean connectionClosed = false;
     34        private static Properties environment = null;
    3535        // TODO ask Pedro if it can be only one object in the map (an object can
    3636        // have multiple threads in the same broker -see environment-)
     
    4444         */
    4545        public static synchronized void initBroker(Properties env) throws Exception {
    46                 if (Environment.isVoid()) {
     46                if (environment == null) {
    4747                        remoteObjs = new HashMap<String, RemoteObject>();
    48                         Environment.setEnvironment(env);
     48                        environment = env;
    4949                        connection = OmqConnectionFactory.getNewConnection(env);
    5050                        channel = connection.createChannel();
     
    6767                        ResponseListener.stopResponseListner();
    6868                        EventDispatcher.stopEventDispatcher();
     69                        Proxymq.stopProxy();
    6970                }
    7071                // Stop all the remote objects working
     
    7273                        unbind(reference);
    7374                }
     75
    7476                // Close the connection once all the listeners are died
    7577                closeConnection();
     78
     79                clientStarted = false;
     80                connectionClosed = false;
     81                environment = null;
     82                remoteObjs = null;
     83                Serializer.removeSerializers();
    7684        }
    7785
     
    8795                connectionClosed = true;
    8896                connection.close();
     97                connectionClosed = false;
    8998        }
    9099
     
    111120        public static <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
    112121                try {
    113                         Properties environment = Environment.getEnvironment();
    114122
    115123                        if (!clientStarted) {
     
    132140        public static void bind(String reference, RemoteObject remote) throws RemoteException {
    133141                try {
    134                         Properties environment = Environment.getEnvironment();
    135142                        remote.startRemoteObject(reference, environment);
    136143                        remoteObjs.put(reference, remote);
     
    244251                                                }
    245252                                                try {
    246                                                         Properties env = Environment.getEnvironment();
    247                                                         connection = OmqConnectionFactory.getNewWorkingConnection(env);
     253                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
    248254                                                        channel = connection.createChannel();
    249255                                                        addFaultTolerance();
     
    265271        }
    266272
     273        public static Properties getEnvironment() {
     274                return environment;
     275        }
     276
    267277}
  • trunk/src/main/java/omq/common/util/Log.java

    r44 r47  
    77import java.util.Properties;
    88
    9 import omq.exception.EnvironmentException;
     9import omq.common.broker.Broker;
    1010
    1111public class Log {
    1212
    1313        public static void saveLog(String processName, byte[] bytesResponse) throws IOException {
    14                 try {
    15                         Properties env = Environment.getEnvironment();
    1614
    17                         String debugPath = env.getProperty(ParameterQueue.DEBUGFILE, "");
    18                         if (debugPath.length() > 0) {
    19                                 long timeNow = (new Date()).getTime();
     15                Properties env = Broker.getEnvironment();
    2016
    21                                 File outputFolder = new File(debugPath + File.separator);
    22                                 outputFolder.mkdirs();
    23                                
    24 //                              File outputFolder = new File(debugPath + File.separator + processName);
    25 //                              outputFolder.mkdirs();
     17                String debugPath = env.getProperty(ParameterQueue.DEBUGFILE, "");
     18                if (debugPath.length() > 0) {
     19                        long timeNow = (new Date()).getTime();
    2620
    27 //                              File outputFileContent = new File(outputFolder.getAbsoluteFile() + File.separator + "content_" + timeNow);
    28 //                              FileOutputStream outputStream = new FileOutputStream(outputFileContent);
    29 //                              IOUtils.write(bytesResponse, outputStream);
    30 //                              outputStream.close();
     21                        File outputFolder = new File(debugPath + File.separator);
     22                        outputFolder.mkdirs();
    3123
    32                                 File outputFileLog = new File(debugPath + File.separator + "log");
    33                                 boolean exist = outputFileLog.exists();
    34                                
    35                                 FileWriter fw = new FileWriter(outputFileLog, true); // the true will append the new data
    36                                 if(!exist){
    37                                         fw.write("#ProcessName\t\tFile\t\t\t\t\tDate\t\t\tSize\n");
    38                                 }
    39                                 fw.write(processName + "\t" + "content_" + timeNow + "\t" + timeNow + "\t" + bytesResponse.length + "\tbytes\n");
    40                                 fw.close();
     24                        // File outputFolder = new File(debugPath + File.separator +
     25                        // processName);
     26                        // outputFolder.mkdirs();
     27
     28                        // File outputFileContent = new
     29                        // File(outputFolder.getAbsoluteFile() + File.separator +
     30                        // "content_" + timeNow);
     31                        // FileOutputStream outputStream = new
     32                        // FileOutputStream(outputFileContent);
     33                        // IOUtils.write(bytesResponse, outputStream);
     34                        // outputStream.close();
     35
     36                        File outputFileLog = new File(debugPath + File.separator + "log");
     37                        boolean exist = outputFileLog.exists();
     38
     39                        FileWriter fw = new FileWriter(outputFileLog, true); // the true
     40                                                                                                                                        // will
     41                                                                                                                                        // append
     42                                                                                                                                        // the
     43                                                                                                                                        // new
     44                                                                                                                                        // data
     45                        if (!exist) {
     46                                fw.write("#ProcessName\t\tFile\t\t\t\t\tDate\t\t\tSize\n");
    4147                        }
    42                 } catch (EnvironmentException e) {
    43                         throw new IOException(e.getMessage(), e);
     48                        fw.write(processName + "\t" + "content_" + timeNow + "\t" + timeNow + "\t" + bytesResponse.length + "\tbytes\n");
     49                        fw.close();
    4450                }
     51
    4552        }
    46        
     53
    4754        public static void saveTimeSendRequestLog(String processName, String coorId, String method, long timeNow) throws IOException {
    48                 try {
    49                         Properties env = Environment.getEnvironment();
    5055
    51                         String debugPath = env.getProperty(ParameterQueue.DEBUGFILE, "");
    52                         if (debugPath.length() > 0) {                   
    53                                 File outputFolder = new File(debugPath + File.separator + processName);
    54                                 outputFolder.mkdirs();
    55                                
    56                                 File outputFileLog = new File(outputFolder + File.separator + "log");
    57                                 boolean exist = outputFileLog.exists();
    58                                
    59                                 FileWriter fw = new FileWriter(outputFileLog, true); // the true will append the new data
    60                                 if(!exist){
    61                                         fw.write("#CoorId\tMethod\tDate\n");
    62                                 }
    63                                 fw.write(coorId + "\t" + method + "\t" + timeNow + "\n");
    64                                 fw.close();
     56                Properties env = Broker.getEnvironment();
     57
     58                String debugPath = env.getProperty(ParameterQueue.DEBUGFILE, "");
     59                if (debugPath.length() > 0) {
     60                        File outputFolder = new File(debugPath + File.separator + processName);
     61                        outputFolder.mkdirs();
     62
     63                        File outputFileLog = new File(outputFolder + File.separator + "log");
     64                        boolean exist = outputFileLog.exists();
     65
     66                        FileWriter fw = new FileWriter(outputFileLog, true); // the true
     67                                                                                                                                        // will
     68                                                                                                                                        // append
     69                                                                                                                                        // the
     70                                                                                                                                        // new
     71                                                                                                                                        // data
     72                        if (!exist) {
     73                                fw.write("#CoorId\tMethod\tDate\n");
    6574                        }
    66                 } catch (EnvironmentException e) {
    67                         throw new IOException(e.getMessage(), e);
     75                        fw.write(coorId + "\t" + method + "\t" + timeNow + "\n");
     76                        fw.close();
    6877                }
    69         }       
     78
     79        }
    7080}
  • trunk/src/main/java/omq/common/util/Serializer.java

    r44 r47  
    44import java.util.Properties;
    55
     6import omq.common.broker.Broker;
    67import omq.common.event.Event;
    78import omq.common.message.Request;
     
    1112import omq.common.util.Serializers.JavaImp;
    1213import omq.common.util.Serializers.KryoImp;
    13 import omq.exception.EnvironmentException;
    1414import omq.exception.SerializerException;
    1515import omq.server.RemoteObject;
     
    2121 */
    2222public class Serializer {
    23         public static String kryo = KryoImp.class.getCanonicalName();
    24         public static String java = JavaImp.class.getCanonicalName();
    25         public static String gson = GsonImp.class.getCanonicalName();
     23        public static String kryo = "kryo";
     24        public static String java = "java";
     25        public static String gson = "gson";
    2626
     27        // Client serializer
    2728        public static ISerializer serializer;
    2829
     30        // Server serializers
     31        private static ISerializer kryoSerializer;
     32        private static ISerializer javaSerializer;
     33        private static ISerializer gsonSerializer;
     34
    2935        private static Boolean getEnableCompression() {
    30                 Boolean enableCompression = false;
    31                 try {
    32                         Properties env = Environment.getEnvironment();
    33                         enableCompression = Boolean.valueOf(env.getProperty(ParameterQueue.ENABLECOMPRESSION, "false"));
    34                 } catch (EnvironmentException e) {
    35                         e.printStackTrace();
    36                 }
    37 
    38                 return enableCompression;
     36                Properties env = Broker.getEnvironment();
     37                return Boolean.valueOf(env.getProperty(ParameterQueue.ENABLECOMPRESSION, "false"));
    3938        }
    4039
     
    4241                if (serializer == null) {
    4342                        try {
    44                                 Properties env = Environment.getEnvironment();
     43                                Properties env = Broker.getEnvironment();
    4544                                String className = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java);
    4645
     
    4948                                }
    5049
    51                                 serializer = (ISerializer) Class.forName(className).newInstance();
     50                                serializer = getInstance(className);
    5251                        } catch (Exception ex) {
    5352                                throw new SerializerException(ex.getMessage(), ex);
     
    5857        }
    5958
     59        public static ISerializer getInstance(String type) throws SerializerException {
     60                if (kryo.equals(type)) {
     61                        if (kryoSerializer == null) {
     62                                kryoSerializer = new KryoImp();
     63                        }
     64                        return kryoSerializer;
     65                } else if (gson.endsWith(type)) {
     66                        if (gsonSerializer == null) {
     67                                gsonSerializer = new GsonImp();
     68                        }
     69                        return gsonSerializer;
     70                } else {
     71                        if (javaSerializer == null) {
     72                                javaSerializer = new JavaImp();
     73                        }
     74                        return javaSerializer;
     75                }
     76        }
     77
     78        public static byte[] serialize(String type, Object obj) throws SerializerException {
     79                ISerializer instance = getInstance(type);
     80
     81                Boolean enableCompression = getEnableCompression();
     82                if (enableCompression) {
     83                        byte[] objSerialized = instance.serialize(obj);
     84                        try {
     85                                return Zipper.zip(objSerialized);
     86                        } catch (IOException e) {
     87                                throw new SerializerException(e.getMessage(), e);
     88                        }
     89                } else {
     90                        return instance.serialize(obj);
     91                }
     92        }
     93
     94        // TODO: remove this function and think about the event serialization
    6095        public static byte[] serialize(Object obj) throws SerializerException {
    6196                ISerializer instance = getInstance();
     
    74109        }
    75110
    76         public static Request deserializeRequest(byte[] bytes, RemoteObject obj) throws SerializerException {
    77                 ISerializer instance = getInstance();
     111        public static Request deserializeRequest(String type, byte[] bytes, RemoteObject obj) throws SerializerException {
     112                ISerializer instance = getInstance(type);
    78113
    79114                Boolean enableCompression = getEnableCompression();
     
    121156                }
    122157        }
     158
     159        public static void removeSerializers() {
     160                serializer = null;
     161                kryoSerializer = null;
     162                javaSerializer = null;
     163                gsonSerializer = null;
     164        }
    123165}
  • trunk/src/main/java/omq/server/InvocationThread.java

    r44 r47  
    3535                                Delivery delivery = deliveryQueue.take();
    3636
     37                                String serializerType = delivery.getProperties().getType();
     38
    3739                                // Deserialize the json
    38                                 Request request = Serializer.deserializeRequest(delivery.getBody(), obj);
     40                                Request request = Serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
    3941                                // Log.saveLog("Server-Deserialize", delivery.getBody());
    4042
     
    6668                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
    6769
    68                                         byte[] bytesResponse = Serializer.serialize(resp);
     70                                        byte[] bytesResponse = Serializer.serialize(serializerType, resp);
    6971                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
    7072
Note: See TracChangeset for help on using the changeset viewer.