Changeset 47
- Timestamp:
- 06/18/13 16:51:22 (11 years ago)
- Location:
- trunk
- Files:
-
- 13 added
- 1 deleted
- 12 edited
- 1 copied
Legend:
- Unmodified
- Added
- Removed
-
trunk/.classpath
r46 r47 23 23 </attributes> 24 24 </classpathentry> 25 <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1. 6">25 <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"> 26 26 <attributes> 27 27 <attribute name="maven.pomderived" value="true"/> -
trunk/src/main/java/omq/client/proxy/Proxymq.java
r44 r47 47 47 48 48 private String uid; 49 private transient String serializerType; 49 50 private transient ResponseListener rListener; 50 51 private transient EventDispatcher dispatcher; … … 89 90 // this.channel = Broker.getChannel(); 90 91 this.env = env; 92 93 // set the serializer type 94 serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java); 91 95 92 96 listeners = new HashMap<String, EventListener<?>>(); … … 150 154 151 155 // Add the correlation ID and create a replyTo property 152 BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName). build();156 BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType).build(); 153 157 154 158 // Publish the message 155 byte[] bytesRequest = Serializer.serialize( request);159 byte[] bytesRequest = Serializer.serialize(serializerType, request); 156 160 // TODO See this 157 161 // channel.basicPublish(exchange, routingkey, props, bytesRequest); … … 306 310 } 307 311 312 public static void stopProxy() { 313 proxies = new HashMap<String, Object>(); 314 } 315 316 public static Map<String, Object> getProxies() { 317 return proxies; 318 } 319 320 public static void setProxies(Map<String, Object> proxies) { 321 Proxymq.proxies = proxies; 322 } 323 308 324 @Override 309 325 public String getRef() { -
trunk/src/main/java/omq/common/broker/Broker.java
r44 r47 12 12 import omq.common.event.EventDispatcher; 13 13 import omq.common.event.EventWrapper; 14 import omq.common.util.Environment;15 14 import omq.common.util.OmqConnectionFactory; 16 15 import omq.common.util.ParameterQueue; … … 33 32 private static boolean clientStarted = false; 34 33 private static boolean connectionClosed = false; 34 private static Properties environment = null; 35 35 // TODO ask Pedro if it can be only one object in the map (an object can 36 36 // have multiple threads in the same broker -see environment-) … … 44 44 */ 45 45 public static synchronized void initBroker(Properties env) throws Exception { 46 if ( Environment.isVoid()) {46 if (environment == null) { 47 47 remoteObjs = new HashMap<String, RemoteObject>(); 48 Environment.setEnvironment(env);48 environment = env; 49 49 connection = OmqConnectionFactory.getNewConnection(env); 50 50 channel = connection.createChannel(); … … 67 67 ResponseListener.stopResponseListner(); 68 68 EventDispatcher.stopEventDispatcher(); 69 Proxymq.stopProxy(); 69 70 } 70 71 // Stop all the remote objects working … … 72 73 unbind(reference); 73 74 } 75 74 76 // Close the connection once all the listeners are died 75 77 closeConnection(); 78 79 clientStarted = false; 80 connectionClosed = false; 81 environment = null; 82 remoteObjs = null; 83 Serializer.removeSerializers(); 76 84 } 77 85 … … 87 95 connectionClosed = true; 88 96 connection.close(); 97 connectionClosed = false; 89 98 } 90 99 … … 111 120 public static <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException { 112 121 try { 113 Properties environment = Environment.getEnvironment();114 122 115 123 if (!clientStarted) { … … 132 140 public static void bind(String reference, RemoteObject remote) throws RemoteException { 133 141 try { 134 Properties environment = Environment.getEnvironment();135 142 remote.startRemoteObject(reference, environment); 136 143 remoteObjs.put(reference, remote); … … 244 251 } 245 252 try { 246 Properties env = Environment.getEnvironment(); 247 connection = OmqConnectionFactory.getNewWorkingConnection(env); 253 connection = OmqConnectionFactory.getNewWorkingConnection(environment); 248 254 channel = connection.createChannel(); 249 255 addFaultTolerance(); … … 265 271 } 266 272 273 public static Properties getEnvironment() { 274 return environment; 275 } 276 267 277 } -
trunk/src/main/java/omq/common/util/Log.java
r44 r47 7 7 import java.util.Properties; 8 8 9 import omq. exception.EnvironmentException;9 import omq.common.broker.Broker; 10 10 11 11 public class Log { 12 12 13 13 public static void saveLog(String processName, byte[] bytesResponse) throws IOException { 14 try {15 Properties env = Environment.getEnvironment();16 14 17 String debugPath = env.getProperty(ParameterQueue.DEBUGFILE, ""); 18 if (debugPath.length() > 0) { 19 long timeNow = (new Date()).getTime(); 15 Properties env = Broker.getEnvironment(); 20 16 21 File outputFolder = new File(debugPath + File.separator); 22 outputFolder.mkdirs(); 23 24 // File outputFolder = new File(debugPath + File.separator + processName); 25 // outputFolder.mkdirs(); 17 String debugPath = env.getProperty(ParameterQueue.DEBUGFILE, ""); 18 if (debugPath.length() > 0) { 19 long timeNow = (new Date()).getTime(); 26 20 27 // File outputFileContent = new File(outputFolder.getAbsoluteFile() + File.separator + "content_" + timeNow); 28 // FileOutputStream outputStream = new FileOutputStream(outputFileContent); 29 // IOUtils.write(bytesResponse, outputStream); 30 // outputStream.close(); 21 File outputFolder = new File(debugPath + File.separator); 22 outputFolder.mkdirs(); 31 23 32 File outputFileLog = new File(debugPath + File.separator + "log"); 33 boolean exist = outputFileLog.exists(); 34 35 FileWriter fw = new FileWriter(outputFileLog, true); // the true will append the new data 36 if(!exist){ 37 fw.write("#ProcessName\t\tFile\t\t\t\t\tDate\t\t\tSize\n"); 38 } 39 fw.write(processName + "\t" + "content_" + timeNow + "\t" + timeNow + "\t" + bytesResponse.length + "\tbytes\n"); 40 fw.close(); 24 // File outputFolder = new File(debugPath + File.separator + 25 // processName); 26 // outputFolder.mkdirs(); 27 28 // File outputFileContent = new 29 // File(outputFolder.getAbsoluteFile() + File.separator + 30 // "content_" + timeNow); 31 // FileOutputStream outputStream = new 32 // FileOutputStream(outputFileContent); 33 // IOUtils.write(bytesResponse, outputStream); 34 // outputStream.close(); 35 36 File outputFileLog = new File(debugPath + File.separator + "log"); 37 boolean exist = outputFileLog.exists(); 38 39 FileWriter fw = new FileWriter(outputFileLog, true); // the true 40 // will 41 // append 42 // the 43 // new 44 // data 45 if (!exist) { 46 fw.write("#ProcessName\t\tFile\t\t\t\t\tDate\t\t\tSize\n"); 41 47 } 42 } catch (EnvironmentException e) {43 throw new IOException(e.getMessage(), e);48 fw.write(processName + "\t" + "content_" + timeNow + "\t" + timeNow + "\t" + bytesResponse.length + "\tbytes\n"); 49 fw.close(); 44 50 } 51 45 52 } 46 53 47 54 public static void saveTimeSendRequestLog(String processName, String coorId, String method, long timeNow) throws IOException { 48 try {49 Properties env = Environment.getEnvironment();50 55 51 String debugPath = env.getProperty(ParameterQueue.DEBUGFILE, ""); 52 if (debugPath.length() > 0) { 53 File outputFolder = new File(debugPath + File.separator + processName); 54 outputFolder.mkdirs(); 55 56 File outputFileLog = new File(outputFolder + File.separator + "log"); 57 boolean exist = outputFileLog.exists(); 58 59 FileWriter fw = new FileWriter(outputFileLog, true); // the true will append the new data 60 if(!exist){ 61 fw.write("#CoorId\tMethod\tDate\n"); 62 } 63 fw.write(coorId + "\t" + method + "\t" + timeNow + "\n"); 64 fw.close(); 56 Properties env = Broker.getEnvironment(); 57 58 String debugPath = env.getProperty(ParameterQueue.DEBUGFILE, ""); 59 if (debugPath.length() > 0) { 60 File outputFolder = new File(debugPath + File.separator + processName); 61 outputFolder.mkdirs(); 62 63 File outputFileLog = new File(outputFolder + File.separator + "log"); 64 boolean exist = outputFileLog.exists(); 65 66 FileWriter fw = new FileWriter(outputFileLog, true); // the true 67 // will 68 // append 69 // the 70 // new 71 // data 72 if (!exist) { 73 fw.write("#CoorId\tMethod\tDate\n"); 65 74 } 66 } catch (EnvironmentException e) {67 throw new IOException(e.getMessage(), e);75 fw.write(coorId + "\t" + method + "\t" + timeNow + "\n"); 76 fw.close(); 68 77 } 69 } 78 79 } 70 80 } -
trunk/src/main/java/omq/common/util/Serializer.java
r44 r47 4 4 import java.util.Properties; 5 5 6 import omq.common.broker.Broker; 6 7 import omq.common.event.Event; 7 8 import omq.common.message.Request; … … 11 12 import omq.common.util.Serializers.JavaImp; 12 13 import omq.common.util.Serializers.KryoImp; 13 import omq.exception.EnvironmentException;14 14 import omq.exception.SerializerException; 15 15 import omq.server.RemoteObject; … … 21 21 */ 22 22 public class Serializer { 23 public static String kryo = KryoImp.class.getCanonicalName();24 public static String java = JavaImp.class.getCanonicalName();25 public static String gson = GsonImp.class.getCanonicalName();23 public static String kryo = "kryo"; 24 public static String java = "java"; 25 public static String gson = "gson"; 26 26 27 // Client serializer 27 28 public static ISerializer serializer; 28 29 30 // Server serializers 31 private static ISerializer kryoSerializer; 32 private static ISerializer javaSerializer; 33 private static ISerializer gsonSerializer; 34 29 35 private static Boolean getEnableCompression() { 30 Boolean enableCompression = false; 31 try { 32 Properties env = Environment.getEnvironment(); 33 enableCompression = Boolean.valueOf(env.getProperty(ParameterQueue.ENABLECOMPRESSION, "false")); 34 } catch (EnvironmentException e) { 35 e.printStackTrace(); 36 } 37 38 return enableCompression; 36 Properties env = Broker.getEnvironment(); 37 return Boolean.valueOf(env.getProperty(ParameterQueue.ENABLECOMPRESSION, "false")); 39 38 } 40 39 … … 42 41 if (serializer == null) { 43 42 try { 44 Properties env = Environment.getEnvironment();43 Properties env = Broker.getEnvironment(); 45 44 String className = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java); 46 45 … … 49 48 } 50 49 51 serializer = (ISerializer) Class.forName(className).newInstance();50 serializer = getInstance(className); 52 51 } catch (Exception ex) { 53 52 throw new SerializerException(ex.getMessage(), ex); … … 58 57 } 59 58 59 public static ISerializer getInstance(String type) throws SerializerException { 60 if (kryo.equals(type)) { 61 if (kryoSerializer == null) { 62 kryoSerializer = new KryoImp(); 63 } 64 return kryoSerializer; 65 } else if (gson.endsWith(type)) { 66 if (gsonSerializer == null) { 67 gsonSerializer = new GsonImp(); 68 } 69 return gsonSerializer; 70 } else { 71 if (javaSerializer == null) { 72 javaSerializer = new JavaImp(); 73 } 74 return javaSerializer; 75 } 76 } 77 78 public static byte[] serialize(String type, Object obj) throws SerializerException { 79 ISerializer instance = getInstance(type); 80 81 Boolean enableCompression = getEnableCompression(); 82 if (enableCompression) { 83 byte[] objSerialized = instance.serialize(obj); 84 try { 85 return Zipper.zip(objSerialized); 86 } catch (IOException e) { 87 throw new SerializerException(e.getMessage(), e); 88 } 89 } else { 90 return instance.serialize(obj); 91 } 92 } 93 94 // TODO: remove this function and think about the event serialization 60 95 public static byte[] serialize(Object obj) throws SerializerException { 61 96 ISerializer instance = getInstance(); … … 74 109 } 75 110 76 public static Request deserializeRequest( byte[] bytes, RemoteObject obj) throws SerializerException {77 ISerializer instance = getInstance( );111 public static Request deserializeRequest(String type, byte[] bytes, RemoteObject obj) throws SerializerException { 112 ISerializer instance = getInstance(type); 78 113 79 114 Boolean enableCompression = getEnableCompression(); … … 121 156 } 122 157 } 158 159 public static void removeSerializers() { 160 serializer = null; 161 kryoSerializer = null; 162 javaSerializer = null; 163 gsonSerializer = null; 164 } 123 165 } -
trunk/src/main/java/omq/server/InvocationThread.java
r44 r47 35 35 Delivery delivery = deliveryQueue.take(); 36 36 37 String serializerType = delivery.getProperties().getType(); 38 37 39 // Deserialize the json 38 Request request = Serializer.deserializeRequest( delivery.getBody(), obj);40 Request request = Serializer.deserializeRequest(serializerType, delivery.getBody(), obj); 39 41 // Log.saveLog("Server-Deserialize", delivery.getBody()); 40 42 … … 66 68 BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build(); 67 69 68 byte[] bytesResponse = Serializer.serialize( resp);70 byte[] bytesResponse = Serializer.serialize(serializerType, resp); 69 71 channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse); 70 72 -
trunk/src/test/java/omq/test/calculator/Calculator.java
r46 r47 23 23 public void asyncDivideByZero() throws IOException, SerializerException; 24 24 25 @SyncMethod 25 @SyncMethod(timeout = 1500) 26 26 public int divideByZero(); 27 27 -
trunk/src/test/java/omq/test/calculator/ClientJava.java
r46 r47 9 9 import omq.common.util.Serializer; 10 10 11 import org.junit.AfterClass; 11 12 import org.junit.BeforeClass; 12 13 import org.junit.Test; 13 14 14 public class Client Test{15 public class ClientJava { 15 16 private static Calculator remoteCalc; 16 17 private static Calculator remoteCalc2; … … 41 42 remoteCalc = (Calculator) Broker.lookup("calculator1", Calculator.class); 42 43 remoteCalc2 = (Calculator) Broker.lookup("calculator2", Calculator.class); 44 } 45 46 @AfterClass 47 public static void stop() throws Exception { 48 Broker.stopBroker(); 49 Thread.sleep(1000); 43 50 } 44 51 -
trunk/src/test/java/omq/test/calculator/ClientTest.java
r46 r47 1 1 package omq.test.calculator; 2 2 3 import static org.junit.Assert.assertEquals; 3 import org.junit.runner.RunWith; 4 import org.junit.runners.Suite; 4 5 5 import java.util.Properties; 6 7 import omq.common.broker.Broker; 8 import omq.common.util.ParameterQueue; 9 import omq.common.util.Serializer; 10 11 import org.junit.BeforeClass; 12 import org.junit.Test; 13 6 @RunWith(Suite.class) 7 @Suite.SuiteClasses({ ClientJava.class, ClientGson.class, ClientKryo.class }) 14 8 public class ClientTest { 15 private static Calculator remoteCalc;16 private static Calculator remoteCalc2;17 18 @BeforeClass19 public static void startClient() throws Exception {20 Properties env = new Properties();21 env.setProperty(ParameterQueue.USER_NAME, "guest");22 env.setProperty(ParameterQueue.USER_PASS, "guest");23 24 // Set host info of rabbimq (where it is)25 env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1");26 env.setProperty(ParameterQueue.SERVER_PORT, "5672");27 env.setProperty(ParameterQueue.DURABLE_QUEUES, "false");28 env.setProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java);29 env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false");30 31 // Set info about where the message will be sent32 env.setProperty(ParameterQueue.RPC_EXCHANGE, "rpc_exchange");33 // env.setProperty(ParameterQueue.DEBUGFILE, "c:\\middlewareDebug");34 35 // Set info about the queue & the exchange where the ResponseListener36 // will listen to.37 env.setProperty(ParameterQueue.RPC_REPLY_QUEUE, "reply_queue");38 env.setProperty(ParameterQueue.EVENT_REPLY_QUEUE, "event_queue");39 40 Broker.initBroker(env);41 remoteCalc = (Calculator) Broker.lookup("calculator1", Calculator.class);42 remoteCalc2 = (Calculator) Broker.lookup("calculator2", Calculator.class);43 }44 45 @Test46 public void add() throws Exception {47 int x = 10;48 int y = 20;49 50 int sync = remoteCalc.add(x, y);51 int sum = x + y;52 53 assertEquals(sum, sync);54 }55 56 @Test57 public void add2() throws Exception {58 int x = 10;59 int y = 20;60 61 int sync = remoteCalc2.add(x, y);62 int sum = x + y;63 64 assertEquals(sum, sync);65 }66 67 @Test68 public void mult() throws Exception {69 int x = 5;70 int y = 15;71 72 remoteCalc.mult(x, y);73 Thread.sleep(200);74 }75 76 @Test77 public void notifyEvent() throws Exception {78 ZeroListener zL = new ZeroListener("zero-event");79 80 remoteCalc.addListener(zL);81 82 remoteCalc.asyncDivideByZero();83 84 Thread.sleep(200);85 }86 87 @Test88 public void sendMessage() throws Exception {89 Message m = new Message(2334, "Hello objectmq");90 remoteCalc.sendMessage(m);91 }92 93 @Test(expected = ArithmeticException.class)94 public void divideByZero() {95 remoteCalc.divideByZero();96 }97 9 } -
trunk/src/test/java/omq/test/calculator/ServerTest.java
r46 r47 2 2 3 3 import java.util.Properties; 4 5 import org.junit.Test; 4 6 5 7 import omq.common.broker.Broker; … … 8 10 9 11 public class ServerTest { 10 private static CalculatorImpl calc;11 private static CalculatorImpl calc2;12 12 13 public static void main(String[] args) throws Exception { 13 private CalculatorImpl calc; 14 private CalculatorImpl calc2; 15 16 @Test 17 public void serverTest() throws Exception { 14 18 Properties env = new Properties(); 15 19 env.setProperty(ParameterQueue.USER_NAME, "guest"); … … 35 39 36 40 System.out.println("Server started"); 41 42 Thread.sleep(60 * 60 * 1000); 37 43 } 38 44 } -
trunk/src/test/java/omq/test/exception/ClientTest.java
r46 r47 4 4 5 5 import java.lang.reflect.UndeclaredThrowableException; 6 import java.util.Arrays; 7 import java.util.Collection; 6 8 import java.util.Properties; 7 9 … … 10 12 import omq.common.util.Serializer; 11 13 12 import org.junit. BeforeClass;14 import org.junit.After; 13 15 import org.junit.Test; 16 import org.junit.runner.RunWith; 17 import org.junit.runners.Parameterized; 18 import org.junit.runners.Parameterized.Parameters; 14 19 20 @RunWith(value = Parameterized.class) 15 21 public class ClientTest { 16 private staticClientInterface client;22 private ClientInterface client; 17 23 18 @BeforeClass 19 public static void startClient() throws Exception { 24 public ClientTest(String type) throws Exception { 20 25 Properties env = new Properties(); 21 26 env.setProperty(ParameterQueue.USER_NAME, "guest"); … … 26 31 env.setProperty(ParameterQueue.SERVER_PORT, "5672"); 27 32 env.setProperty(ParameterQueue.DURABLE_QUEUES, "false"); 28 env.setProperty(ParameterQueue.SERIALIZER_NAME, Serializer.kryo);33 env.setProperty(ParameterQueue.SERIALIZER_NAME, type); 29 34 env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false"); 30 35 … … 39 44 Broker.initBroker(env); 40 45 client = (ClientInterface) Broker.lookup("server", ClientInterface.class); 46 } 47 48 @Parameters 49 public static Collection<Object[]> data() { 50 Object[][] data = new Object[][] { { Serializer.java }, { Serializer.gson }, { Serializer.kryo } }; 51 return Arrays.asList(data); 52 } 53 54 @After 55 public void stop() throws Exception { 56 Broker.stopBroker(); 41 57 } 42 58 -
trunk/src/test/java/omq/test/exception/ServerTest.java
r46 r47 5 5 import omq.common.broker.Broker; 6 6 import omq.common.util.ParameterQueue; 7 import omq.common.util.Serializer; 7 8 import org.junit.Test; 8 9 9 10 public class ServerTest { 10 11 11 public static void main(String[] args) throws Exception { 12 @Test 13 public void test() throws Exception { 12 14 Properties env = new Properties(); 13 15 env.setProperty(ParameterQueue.USER_NAME, "guest"); … … 18 20 env.setProperty(ParameterQueue.SERVER_PORT, "5672"); 19 21 env.setProperty(ParameterQueue.DURABLE_QUEUES, "false"); 20 env.setProperty(ParameterQueue.SERIALIZER_NAME, Serializer.kryo);21 22 env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false"); 22 23 … … 29 30 Broker.initBroker(env); 30 31 Broker.bind("server", server); 32 33 Thread.sleep(60 * 60 * 1000); 31 34 } 32 35 } -
trunk/src/test/java/omq/test/faultTolerance/ServerTest.java
r46 r47 2 2 3 3 import java.util.Properties; 4 5 import org.junit.Test; 4 6 5 7 import omq.common.broker.Broker; … … 11 13 private static CalculatorImpl calc; 12 14 13 public static void main(String[] args) throws Exception { 15 @Test 16 public void test() throws Exception { 14 17 Properties env = new Properties(); 15 18 env.setProperty(ParameterQueue.USER_NAME, "guest"); … … 33 36 34 37 System.out.println("Server started"); 38 39 Thread.sleep(60 * 1000); 35 40 } 36 41 }
Note: See TracChangeset
for help on using the changeset viewer.