Changeset 47 for trunk/src


Ignore:
Timestamp:
06/18/13 16:51:22 (11 years ago)
Author:
stoda
Message:

Refactoring Environment class - deleted.
StopBroker? problems solved (?)
Server can receive send and receive messages in different formats.
Some tests modified

TODO: finish all the tests, add log4j

Location:
trunk/src
Files:
3 added
1 deleted
11 edited
1 copied

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/client/proxy/Proxymq.java

    r44 r47  
    4747
    4848        private String uid;
     49        private transient String serializerType;
    4950        private transient ResponseListener rListener;
    5051        private transient EventDispatcher dispatcher;
     
    8990                // this.channel = Broker.getChannel();
    9091                this.env = env;
     92
     93                // set the serializer type
     94                serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java);
    9195
    9296                listeners = new HashMap<String, EventListener<?>>();
     
    150154
    151155                // Add the correlation ID and create a replyTo property
    152                 BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).build();
     156                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType).build();
    153157
    154158                // Publish the message
    155                 byte[] bytesRequest = Serializer.serialize(request);
     159                byte[] bytesRequest = Serializer.serialize(serializerType, request);
    156160                // TODO See this
    157161                // channel.basicPublish(exchange, routingkey, props, bytesRequest);
     
    306310        }
    307311
     312        public static void stopProxy() {
     313                proxies = new HashMap<String, Object>();
     314        }
     315
     316        public static Map<String, Object> getProxies() {
     317                return proxies;
     318        }
     319
     320        public static void setProxies(Map<String, Object> proxies) {
     321                Proxymq.proxies = proxies;
     322        }
     323
    308324        @Override
    309325        public String getRef() {
  • trunk/src/main/java/omq/common/broker/Broker.java

    r44 r47  
    1212import omq.common.event.EventDispatcher;
    1313import omq.common.event.EventWrapper;
    14 import omq.common.util.Environment;
    1514import omq.common.util.OmqConnectionFactory;
    1615import omq.common.util.ParameterQueue;
     
    3332        private static boolean clientStarted = false;
    3433        private static boolean connectionClosed = false;
     34        private static Properties environment = null;
    3535        // TODO ask Pedro if it can be only one object in the map (an object can
    3636        // have multiple threads in the same broker -see environment-)
     
    4444         */
    4545        public static synchronized void initBroker(Properties env) throws Exception {
    46                 if (Environment.isVoid()) {
     46                if (environment == null) {
    4747                        remoteObjs = new HashMap<String, RemoteObject>();
    48                         Environment.setEnvironment(env);
     48                        environment = env;
    4949                        connection = OmqConnectionFactory.getNewConnection(env);
    5050                        channel = connection.createChannel();
     
    6767                        ResponseListener.stopResponseListner();
    6868                        EventDispatcher.stopEventDispatcher();
     69                        Proxymq.stopProxy();
    6970                }
    7071                // Stop all the remote objects working
     
    7273                        unbind(reference);
    7374                }
     75
    7476                // Close the connection once all the listeners are died
    7577                closeConnection();
     78
     79                clientStarted = false;
     80                connectionClosed = false;
     81                environment = null;
     82                remoteObjs = null;
     83                Serializer.removeSerializers();
    7684        }
    7785
     
    8795                connectionClosed = true;
    8896                connection.close();
     97                connectionClosed = false;
    8998        }
    9099
     
    111120        public static <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {
    112121                try {
    113                         Properties environment = Environment.getEnvironment();
    114122
    115123                        if (!clientStarted) {
     
    132140        public static void bind(String reference, RemoteObject remote) throws RemoteException {
    133141                try {
    134                         Properties environment = Environment.getEnvironment();
    135142                        remote.startRemoteObject(reference, environment);
    136143                        remoteObjs.put(reference, remote);
     
    244251                                                }
    245252                                                try {
    246                                                         Properties env = Environment.getEnvironment();
    247                                                         connection = OmqConnectionFactory.getNewWorkingConnection(env);
     253                                                        connection = OmqConnectionFactory.getNewWorkingConnection(environment);
    248254                                                        channel = connection.createChannel();
    249255                                                        addFaultTolerance();
     
    265271        }
    266272
     273        public static Properties getEnvironment() {
     274                return environment;
     275        }
     276
    267277}
  • trunk/src/main/java/omq/common/util/Log.java

    r44 r47  
    77import java.util.Properties;
    88
    9 import omq.exception.EnvironmentException;
     9import omq.common.broker.Broker;
    1010
    1111public class Log {
    1212
    1313        public static void saveLog(String processName, byte[] bytesResponse) throws IOException {
    14                 try {
    15                         Properties env = Environment.getEnvironment();
    1614
    17                         String debugPath = env.getProperty(ParameterQueue.DEBUGFILE, "");
    18                         if (debugPath.length() > 0) {
    19                                 long timeNow = (new Date()).getTime();
     15                Properties env = Broker.getEnvironment();
    2016
    21                                 File outputFolder = new File(debugPath + File.separator);
    22                                 outputFolder.mkdirs();
    23                                
    24 //                              File outputFolder = new File(debugPath + File.separator + processName);
    25 //                              outputFolder.mkdirs();
     17                String debugPath = env.getProperty(ParameterQueue.DEBUGFILE, "");
     18                if (debugPath.length() > 0) {
     19                        long timeNow = (new Date()).getTime();
    2620
    27 //                              File outputFileContent = new File(outputFolder.getAbsoluteFile() + File.separator + "content_" + timeNow);
    28 //                              FileOutputStream outputStream = new FileOutputStream(outputFileContent);
    29 //                              IOUtils.write(bytesResponse, outputStream);
    30 //                              outputStream.close();
     21                        File outputFolder = new File(debugPath + File.separator);
     22                        outputFolder.mkdirs();
    3123
    32                                 File outputFileLog = new File(debugPath + File.separator + "log");
    33                                 boolean exist = outputFileLog.exists();
    34                                
    35                                 FileWriter fw = new FileWriter(outputFileLog, true); // the true will append the new data
    36                                 if(!exist){
    37                                         fw.write("#ProcessName\t\tFile\t\t\t\t\tDate\t\t\tSize\n");
    38                                 }
    39                                 fw.write(processName + "\t" + "content_" + timeNow + "\t" + timeNow + "\t" + bytesResponse.length + "\tbytes\n");
    40                                 fw.close();
     24                        // File outputFolder = new File(debugPath + File.separator +
     25                        // processName);
     26                        // outputFolder.mkdirs();
     27
     28                        // File outputFileContent = new
     29                        // File(outputFolder.getAbsoluteFile() + File.separator +
     30                        // "content_" + timeNow);
     31                        // FileOutputStream outputStream = new
     32                        // FileOutputStream(outputFileContent);
     33                        // IOUtils.write(bytesResponse, outputStream);
     34                        // outputStream.close();
     35
     36                        File outputFileLog = new File(debugPath + File.separator + "log");
     37                        boolean exist = outputFileLog.exists();
     38
     39                        FileWriter fw = new FileWriter(outputFileLog, true); // the true
     40                                                                                                                                        // will
     41                                                                                                                                        // append
     42                                                                                                                                        // the
     43                                                                                                                                        // new
     44                                                                                                                                        // data
     45                        if (!exist) {
     46                                fw.write("#ProcessName\t\tFile\t\t\t\t\tDate\t\t\tSize\n");
    4147                        }
    42                 } catch (EnvironmentException e) {
    43                         throw new IOException(e.getMessage(), e);
     48                        fw.write(processName + "\t" + "content_" + timeNow + "\t" + timeNow + "\t" + bytesResponse.length + "\tbytes\n");
     49                        fw.close();
    4450                }
     51
    4552        }
    46        
     53
    4754        public static void saveTimeSendRequestLog(String processName, String coorId, String method, long timeNow) throws IOException {
    48                 try {
    49                         Properties env = Environment.getEnvironment();
    5055
    51                         String debugPath = env.getProperty(ParameterQueue.DEBUGFILE, "");
    52                         if (debugPath.length() > 0) {                   
    53                                 File outputFolder = new File(debugPath + File.separator + processName);
    54                                 outputFolder.mkdirs();
    55                                
    56                                 File outputFileLog = new File(outputFolder + File.separator + "log");
    57                                 boolean exist = outputFileLog.exists();
    58                                
    59                                 FileWriter fw = new FileWriter(outputFileLog, true); // the true will append the new data
    60                                 if(!exist){
    61                                         fw.write("#CoorId\tMethod\tDate\n");
    62                                 }
    63                                 fw.write(coorId + "\t" + method + "\t" + timeNow + "\n");
    64                                 fw.close();
     56                Properties env = Broker.getEnvironment();
     57
     58                String debugPath = env.getProperty(ParameterQueue.DEBUGFILE, "");
     59                if (debugPath.length() > 0) {
     60                        File outputFolder = new File(debugPath + File.separator + processName);
     61                        outputFolder.mkdirs();
     62
     63                        File outputFileLog = new File(outputFolder + File.separator + "log");
     64                        boolean exist = outputFileLog.exists();
     65
     66                        FileWriter fw = new FileWriter(outputFileLog, true); // the true
     67                                                                                                                                        // will
     68                                                                                                                                        // append
     69                                                                                                                                        // the
     70                                                                                                                                        // new
     71                                                                                                                                        // data
     72                        if (!exist) {
     73                                fw.write("#CoorId\tMethod\tDate\n");
    6574                        }
    66                 } catch (EnvironmentException e) {
    67                         throw new IOException(e.getMessage(), e);
     75                        fw.write(coorId + "\t" + method + "\t" + timeNow + "\n");
     76                        fw.close();
    6877                }
    69         }       
     78
     79        }
    7080}
  • trunk/src/main/java/omq/common/util/Serializer.java

    r44 r47  
    44import java.util.Properties;
    55
     6import omq.common.broker.Broker;
    67import omq.common.event.Event;
    78import omq.common.message.Request;
     
    1112import omq.common.util.Serializers.JavaImp;
    1213import omq.common.util.Serializers.KryoImp;
    13 import omq.exception.EnvironmentException;
    1414import omq.exception.SerializerException;
    1515import omq.server.RemoteObject;
     
    2121 */
    2222public class Serializer {
    23         public static String kryo = KryoImp.class.getCanonicalName();
    24         public static String java = JavaImp.class.getCanonicalName();
    25         public static String gson = GsonImp.class.getCanonicalName();
     23        public static String kryo = "kryo";
     24        public static String java = "java";
     25        public static String gson = "gson";
    2626
     27        // Client serializer
    2728        public static ISerializer serializer;
    2829
     30        // Server serializers
     31        private static ISerializer kryoSerializer;
     32        private static ISerializer javaSerializer;
     33        private static ISerializer gsonSerializer;
     34
    2935        private static Boolean getEnableCompression() {
    30                 Boolean enableCompression = false;
    31                 try {
    32                         Properties env = Environment.getEnvironment();
    33                         enableCompression = Boolean.valueOf(env.getProperty(ParameterQueue.ENABLECOMPRESSION, "false"));
    34                 } catch (EnvironmentException e) {
    35                         e.printStackTrace();
    36                 }
    37 
    38                 return enableCompression;
     36                Properties env = Broker.getEnvironment();
     37                return Boolean.valueOf(env.getProperty(ParameterQueue.ENABLECOMPRESSION, "false"));
    3938        }
    4039
     
    4241                if (serializer == null) {
    4342                        try {
    44                                 Properties env = Environment.getEnvironment();
     43                                Properties env = Broker.getEnvironment();
    4544                                String className = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java);
    4645
     
    4948                                }
    5049
    51                                 serializer = (ISerializer) Class.forName(className).newInstance();
     50                                serializer = getInstance(className);
    5251                        } catch (Exception ex) {
    5352                                throw new SerializerException(ex.getMessage(), ex);
     
    5857        }
    5958
     59        public static ISerializer getInstance(String type) throws SerializerException {
     60                if (kryo.equals(type)) {
     61                        if (kryoSerializer == null) {
     62                                kryoSerializer = new KryoImp();
     63                        }
     64                        return kryoSerializer;
     65                } else if (gson.endsWith(type)) {
     66                        if (gsonSerializer == null) {
     67                                gsonSerializer = new GsonImp();
     68                        }
     69                        return gsonSerializer;
     70                } else {
     71                        if (javaSerializer == null) {
     72                                javaSerializer = new JavaImp();
     73                        }
     74                        return javaSerializer;
     75                }
     76        }
     77
     78        public static byte[] serialize(String type, Object obj) throws SerializerException {
     79                ISerializer instance = getInstance(type);
     80
     81                Boolean enableCompression = getEnableCompression();
     82                if (enableCompression) {
     83                        byte[] objSerialized = instance.serialize(obj);
     84                        try {
     85                                return Zipper.zip(objSerialized);
     86                        } catch (IOException e) {
     87                                throw new SerializerException(e.getMessage(), e);
     88                        }
     89                } else {
     90                        return instance.serialize(obj);
     91                }
     92        }
     93
     94        // TODO: remove this function and think about the event serialization
    6095        public static byte[] serialize(Object obj) throws SerializerException {
    6196                ISerializer instance = getInstance();
     
    74109        }
    75110
    76         public static Request deserializeRequest(byte[] bytes, RemoteObject obj) throws SerializerException {
    77                 ISerializer instance = getInstance();
     111        public static Request deserializeRequest(String type, byte[] bytes, RemoteObject obj) throws SerializerException {
     112                ISerializer instance = getInstance(type);
    78113
    79114                Boolean enableCompression = getEnableCompression();
     
    121156                }
    122157        }
     158
     159        public static void removeSerializers() {
     160                serializer = null;
     161                kryoSerializer = null;
     162                javaSerializer = null;
     163                gsonSerializer = null;
     164        }
    123165}
  • trunk/src/main/java/omq/server/InvocationThread.java

    r44 r47  
    3535                                Delivery delivery = deliveryQueue.take();
    3636
     37                                String serializerType = delivery.getProperties().getType();
     38
    3739                                // Deserialize the json
    38                                 Request request = Serializer.deserializeRequest(delivery.getBody(), obj);
     40                                Request request = Serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
    3941                                // Log.saveLog("Server-Deserialize", delivery.getBody());
    4042
     
    6668                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
    6769
    68                                         byte[] bytesResponse = Serializer.serialize(resp);
     70                                        byte[] bytesResponse = Serializer.serialize(serializerType, resp);
    6971                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
    7072
  • trunk/src/test/java/omq/test/calculator/Calculator.java

    r46 r47  
    2323        public void asyncDivideByZero() throws IOException, SerializerException;
    2424
    25         @SyncMethod
     25        @SyncMethod(timeout = 1500)
    2626        public int divideByZero();
    2727
  • trunk/src/test/java/omq/test/calculator/ClientJava.java

    r46 r47  
    99import omq.common.util.Serializer;
    1010
     11import org.junit.AfterClass;
    1112import org.junit.BeforeClass;
    1213import org.junit.Test;
    1314
    14 public class ClientTest {
     15public class ClientJava {
    1516        private static Calculator remoteCalc;
    1617        private static Calculator remoteCalc2;
     
    4142                remoteCalc = (Calculator) Broker.lookup("calculator1", Calculator.class);
    4243                remoteCalc2 = (Calculator) Broker.lookup("calculator2", Calculator.class);
     44        }
     45
     46        @AfterClass
     47        public static void stop() throws Exception {
     48                Broker.stopBroker();
     49                Thread.sleep(1000);
    4350        }
    4451
  • trunk/src/test/java/omq/test/calculator/ClientTest.java

    r46 r47  
    11package omq.test.calculator;
    22
    3 import static org.junit.Assert.assertEquals;
     3import org.junit.runner.RunWith;
     4import org.junit.runners.Suite;
    45
    5 import java.util.Properties;
    6 
    7 import omq.common.broker.Broker;
    8 import omq.common.util.ParameterQueue;
    9 import omq.common.util.Serializer;
    10 
    11 import org.junit.BeforeClass;
    12 import org.junit.Test;
    13 
     6@RunWith(Suite.class)
     7@Suite.SuiteClasses({ ClientJava.class, ClientGson.class, ClientKryo.class })
    148public class ClientTest {
    15         private static Calculator remoteCalc;
    16         private static Calculator remoteCalc2;
    17 
    18         @BeforeClass
    19         public static void startClient() throws Exception {
    20                 Properties env = new Properties();
    21                 env.setProperty(ParameterQueue.USER_NAME, "guest");
    22                 env.setProperty(ParameterQueue.USER_PASS, "guest");
    23 
    24                 // Set host info of rabbimq (where it is)
    25                 env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1");
    26                 env.setProperty(ParameterQueue.SERVER_PORT, "5672");
    27                 env.setProperty(ParameterQueue.DURABLE_QUEUES, "false");
    28                 env.setProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java);
    29                 env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false");
    30 
    31                 // Set info about where the message will be sent
    32                 env.setProperty(ParameterQueue.RPC_EXCHANGE, "rpc_exchange");
    33                 // env.setProperty(ParameterQueue.DEBUGFILE, "c:\\middlewareDebug");
    34 
    35                 // Set info about the queue & the exchange where the ResponseListener
    36                 // will listen to.
    37                 env.setProperty(ParameterQueue.RPC_REPLY_QUEUE, "reply_queue");
    38                 env.setProperty(ParameterQueue.EVENT_REPLY_QUEUE, "event_queue");
    39 
    40                 Broker.initBroker(env);
    41                 remoteCalc = (Calculator) Broker.lookup("calculator1", Calculator.class);
    42                 remoteCalc2 = (Calculator) Broker.lookup("calculator2", Calculator.class);
    43         }
    44 
    45         @Test
    46         public void add() throws Exception {
    47                 int x = 10;
    48                 int y = 20;
    49 
    50                 int sync = remoteCalc.add(x, y);
    51                 int sum = x + y;
    52 
    53                 assertEquals(sum, sync);
    54         }
    55 
    56         @Test
    57         public void add2() throws Exception {
    58                 int x = 10;
    59                 int y = 20;
    60 
    61                 int sync = remoteCalc2.add(x, y);
    62                 int sum = x + y;
    63 
    64                 assertEquals(sum, sync);
    65         }
    66 
    67         @Test
    68         public void mult() throws Exception {
    69                 int x = 5;
    70                 int y = 15;
    71 
    72                 remoteCalc.mult(x, y);
    73                 Thread.sleep(200);
    74         }
    75 
    76         @Test
    77         public void notifyEvent() throws Exception {
    78                 ZeroListener zL = new ZeroListener("zero-event");
    79 
    80                 remoteCalc.addListener(zL);
    81 
    82                 remoteCalc.asyncDivideByZero();
    83 
    84                 Thread.sleep(200);
    85         }
    86 
    87         @Test
    88         public void sendMessage() throws Exception {
    89                 Message m = new Message(2334, "Hello objectmq");
    90                 remoteCalc.sendMessage(m);
    91         }
    92 
    93         @Test(expected = ArithmeticException.class)
    94         public void divideByZero() {
    95                 remoteCalc.divideByZero();
    96         }
    979}
  • trunk/src/test/java/omq/test/calculator/ServerTest.java

    r46 r47  
    22
    33import java.util.Properties;
     4
     5import org.junit.Test;
    46
    57import omq.common.broker.Broker;
     
    810
    911public class ServerTest {
    10         private static CalculatorImpl calc;
    11         private static CalculatorImpl calc2;
    1212
    13         public static void main(String[] args) throws Exception {
     13        private CalculatorImpl calc;
     14        private CalculatorImpl calc2;
     15
     16        @Test
     17        public void serverTest() throws Exception {
    1418                Properties env = new Properties();
    1519                env.setProperty(ParameterQueue.USER_NAME, "guest");
     
    3539
    3640                System.out.println("Server started");
     41
     42                Thread.sleep(60 * 60 * 1000);
    3743        }
    3844}
  • trunk/src/test/java/omq/test/exception/ClientTest.java

    r46 r47  
    44
    55import java.lang.reflect.UndeclaredThrowableException;
     6import java.util.Arrays;
     7import java.util.Collection;
    68import java.util.Properties;
    79
     
    1012import omq.common.util.Serializer;
    1113
    12 import org.junit.BeforeClass;
     14import org.junit.After;
    1315import org.junit.Test;
     16import org.junit.runner.RunWith;
     17import org.junit.runners.Parameterized;
     18import org.junit.runners.Parameterized.Parameters;
    1419
     20@RunWith(value = Parameterized.class)
    1521public class ClientTest {
    16         private static ClientInterface client;
     22        private ClientInterface client;
    1723
    18         @BeforeClass
    19         public static void startClient() throws Exception {
     24        public ClientTest(String type) throws Exception {
    2025                Properties env = new Properties();
    2126                env.setProperty(ParameterQueue.USER_NAME, "guest");
     
    2631                env.setProperty(ParameterQueue.SERVER_PORT, "5672");
    2732                env.setProperty(ParameterQueue.DURABLE_QUEUES, "false");
    28                 env.setProperty(ParameterQueue.SERIALIZER_NAME, Serializer.kryo);
     33                env.setProperty(ParameterQueue.SERIALIZER_NAME, type);
    2934                env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false");
    3035
     
    3944                Broker.initBroker(env);
    4045                client = (ClientInterface) Broker.lookup("server", ClientInterface.class);
     46        }
     47
     48        @Parameters
     49        public static Collection<Object[]> data() {
     50                Object[][] data = new Object[][] { { Serializer.java }, { Serializer.gson }, { Serializer.kryo } };
     51                return Arrays.asList(data);
     52        }
     53
     54        @After
     55        public void stop() throws Exception {
     56                Broker.stopBroker();
    4157        }
    4258
  • trunk/src/test/java/omq/test/exception/ServerTest.java

    r46 r47  
    55import omq.common.broker.Broker;
    66import omq.common.util.ParameterQueue;
    7 import omq.common.util.Serializer;
     7
     8import org.junit.Test;
    89
    910public class ServerTest {
    1011
    11         public static void main(String[] args) throws Exception {
     12        @Test
     13        public void test() throws Exception {
    1214                Properties env = new Properties();
    1315                env.setProperty(ParameterQueue.USER_NAME, "guest");
     
    1820                env.setProperty(ParameterQueue.SERVER_PORT, "5672");
    1921                env.setProperty(ParameterQueue.DURABLE_QUEUES, "false");
    20                 env.setProperty(ParameterQueue.SERIALIZER_NAME, Serializer.kryo);
    2122                env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false");
    2223
     
    2930                Broker.initBroker(env);
    3031                Broker.bind("server", server);
     32
     33                Thread.sleep(60 * 60 * 1000);
    3134        }
    3235}
  • trunk/src/test/java/omq/test/faultTolerance/ServerTest.java

    r46 r47  
    22
    33import java.util.Properties;
     4
     5import org.junit.Test;
    46
    57import omq.common.broker.Broker;
     
    1113        private static CalculatorImpl calc;
    1214
    13         public static void main(String[] args) throws Exception {
     15        @Test
     16        public void test() throws Exception {
    1417                Properties env = new Properties();
    1518                env.setProperty(ParameterQueue.USER_NAME, "guest");
     
    3336
    3437                System.out.println("Server started");
     38               
     39                Thread.sleep(60 * 1000);
    3540        }
    3641}
Note: See TracChangeset for help on using the changeset viewer.