Changeset 38
- Timestamp:
- 06/13/13 17:19:52 (11 years ago)
- Location:
- trunk/objectmq
- Files:
-
- 8 added
- 11 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/objectmq/src/omq/client/proxy/Proxymq.java
r36 r38 24 24 import omq.common.util.Serializer; 25 25 import omq.exception.NoContainsInstanceException; 26 import omq.exception.OmqException; 26 27 import omq.exception.RetryException; 27 28 import omq.exception.SerializerException; … … 234 235 } 235 236 237 if (resp.getError() != null) { 238 OmqException error = resp.getError(); 239 String name = error.getType();System.out.println("Name: "+name); 240 String message = error.getMessage(); 241 throw (Exception) Class.forName(name).getConstructor(String.class).newInstance(message); 242 } 243 236 244 return resp.getResult(); 237 245 } -
trunk/objectmq/src/omq/common/broker/Broker.java
r37 r38 196 196 */ 197 197 public static void tryConnection(Properties env) throws Exception { 198 System.out.println("hola");199 198 Channel channel = connection.createChannel(); 200 199 String message = "ping"; … … 221 220 222 221 if (!message.equalsIgnoreCase(new String(delivery.getBody()))) { 223 throw new IOException("Ping -ponginitialitzation has failed");222 throw new IOException("Ping initialitzation has failed"); 224 223 } 225 224 } -
trunk/objectmq/src/omq/common/message/Request.java
r9 r38 10 10 private static final long serialVersionUID = 6366255840200365083L; 11 11 12 private String method; 13 private Object[] params; 12 14 private String id; 13 private String method;14 15 private boolean async = false; 15 private Object[] arguments;16 16 17 17 private transient long timeout; … … 21 21 } 22 22 23 public Request(String id, String method, Object[] arguments) {23 public Request(String id, String method, Object[] params) { 24 24 this.id = id; 25 25 this.method = method; 26 this. arguments = arguments;26 this.params = params; 27 27 } 28 28 29 private Request(String id, String method, boolean async, Object[] arguments) {29 private Request(String id, String method, boolean async, Object[] params) { 30 30 this.id = id; 31 31 this.method = method; 32 32 this.async = async; 33 this. arguments = arguments;33 this.params = params; 34 34 } 35 35 36 public static Request newSyncRequest(String id, String method, Object[] arguments) {37 return new Request(id, method, false, arguments);36 public static Request newSyncRequest(String id, String method, Object[] params) { 37 return new Request(id, method, false, params); 38 38 } 39 39 40 public static Request newSyncRequest(String id, String method, Object[] arguments, int retries, long timeout) {41 Request req = new Request(id, method, false, arguments);40 public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout) { 41 Request req = new Request(id, method, false, params); 42 42 req.setRetries(retries); 43 43 req.setTimeout(timeout); … … 45 45 } 46 46 47 public static Request newAsyncRequest(String id, String method, Object[] arguments) {48 return new Request(id, method, true, arguments);47 public static Request newAsyncRequest(String id, String method, Object[] params) { 48 return new Request(id, method, true, params); 49 49 } 50 50 … … 65 65 } 66 66 67 public Object[] get Arguments() {68 return arguments;67 public Object[] getParams() { 68 return params; 69 69 } 70 70 71 public void set Arguments(Object[] arguments) {72 this. arguments = arguments;71 public void setParams(Object[] params) { 72 this.params = params; 73 73 } 74 74 -
trunk/objectmq/src/omq/common/message/Response.java
r9 r38 2 2 3 3 import java.io.Serializable; 4 5 import omq.exception.OmqException; 4 6 5 7 public class Response implements Serializable { … … 10 12 private static final long serialVersionUID = 3368363997012527189L; 11 13 14 private Object result; 15 private OmqException error; 12 16 private String id; 13 17 private String idOmq; 14 private Object result;15 18 16 19 public Response() { 17 20 } 18 21 19 public Response(String id, String idOmq, Object result ) {22 public Response(String id, String idOmq, Object result, OmqException error) { 20 23 this.id = id; 21 24 this.idOmq = idOmq; 22 25 this.result = result; 26 this.error = error; 23 27 } 24 28 … … 47 51 } 48 52 53 public OmqException getError() { 54 return error; 55 } 56 57 public void setError(OmqException error) { 58 this.error = error; 59 } 60 49 61 } -
trunk/objectmq/src/omq/common/util/Serializers/GsonImp.java
r34 r38 6 6 import omq.common.message.Request; 7 7 import omq.common.message.Response; 8 import omq.exception.OmqException; 8 9 import omq.exception.SerializerException; 9 10 import omq.server.RemoteObject; … … 69 70 Object result = gson.fromJson(jsonElement, type); 70 71 71 return new Response(id, idOmq, result); 72 JsonElement jsonError = jsonObj.get("error"); 73 OmqException error = gson.fromJson(jsonError, OmqException.class); 74 75 return new Response(id, idOmq, result, error); 72 76 } 73 77 … … 77 81 String json = new String(bytes); 78 82 System.out.println(json); 79 83 80 84 JsonParser parser = new JsonParser(); 81 85 JsonObject jsonObj = parser.parse(json).getAsJsonObject(); -
trunk/objectmq/src/omq/server/InvocationThread.java
r37 r38 1 1 package omq.server; 2 2 3 import java.lang.reflect.InvocationTargetException; 3 4 import java.util.concurrent.BlockingQueue; 4 5 … … 6 7 import omq.common.message.Response; 7 8 import omq.common.util.Serializer; 9 import omq.exception.OmqException; 8 10 9 11 import com.rabbitmq.client.AMQP.BasicProperties; … … 35 37 // Deserialize the json 36 38 Request request = Serializer.deserializeRequest(delivery.getBody(), obj); 37 // Log.saveLog("Server-Deserialize", delivery.getBody());39 // Log.saveLog("Server-Deserialize", delivery.getBody()); 38 40 39 41 String methodName = request.getMethod(); … … 42 44 System.out.println("Invoke method: " + methodName + " CorrID: " + requestID); 43 45 44 // Invoke the method 45 Object result = obj.invokeMethod(request.getMethod(), 46 request.getArguments()); 46 // Invoke the method 47 Object result = null; 48 OmqException error = null; 49 try { 50 result = obj.invokeMethod(request.getMethod(), request.getParams()); 51 } catch (InvocationTargetException e) { 52 Throwable throwable = e.getTargetException(); 53 error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage()); 54 } catch (NoSuchMethodException e) { 55 error = new OmqException(e.getClass().getCanonicalName(), e.getMessage()); 56 } 47 57 58 // Reply if it's necessary 48 59 if (!request.isAsync()) { 49 Response resp = new Response(request.getId(), obj.getRef(), result );60 Response resp = new Response(request.getId(), obj.getRef(), result, error); 50 61 51 62 Channel channel = obj.getChannel(); … … 58 69 channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse); 59 70 60 // Log.saveLog("Server-Serialize", bytesResponse);71 // Log.saveLog("Server-Serialize", bytesResponse); 61 72 } 62 73 … … 69 80 } 70 81 71 }System.out.println("Invocation Thread dies!!"); 82 } 83 System.out.println("Invocation Thread dies!!"); 72 84 } 73 85 -
trunk/objectmq/src/omq/server/RemoteObject.java
r37 r38 166 166 167 167 private Method loadMethodWithPrimitives(String methodName, Class<?>[] argArray) throws NoSuchMethodException { 168 Method[] methods = this.getClass().getMethods(); 169 int length = argArray.length; 170 171 for (Method method : methods) { 172 String name = method.getName(); 173 int argsLength = method.getParameterTypes().length; 174 175 if (name.equals(methodName) && length == argsLength) { 176 // This array can have primitive types inside 177 Class<?>[] params = method.getParameterTypes(); 178 179 boolean found = true; 180 181 for (int i = 0; i < length; i++) { 182 if (params[i].isPrimitive()) { 183 Class<?> paramWrapper = primitiveClasses.get(params[i].getName()); 184 185 if (!paramWrapper.equals(argArray[i])) { 186 found = false; 187 break; 168 if (argArray != null) { 169 Method[] methods = this.getClass().getMethods(); 170 int length = argArray.length; 171 172 for (Method method : methods) { 173 String name = method.getName(); 174 int argsLength = method.getParameterTypes().length; 175 176 if (name.equals(methodName) && length == argsLength) { 177 // This array can have primitive types inside 178 Class<?>[] params = method.getParameterTypes(); 179 180 boolean found = true; 181 182 for (int i = 0; i < length; i++) { 183 if (params[i].isPrimitive()) { 184 Class<?> paramWrapper = primitiveClasses.get(params[i].getName()); 185 186 if (!paramWrapper.equals(argArray[i])) { 187 found = false; 188 break; 189 } 188 190 } 189 191 } 190 }191 if (found) {192 return method;192 if (found) { 193 return method; 194 } 193 195 } 194 196 } -
trunk/objectmq/test/calculatorTest/Calculator.java
r34 r38 21 21 22 22 @AsyncMethod 23 public void divideByZero() throws IOException, SerializerException; 23 public void asyncDivideByZero() throws IOException, SerializerException; 24 25 @SyncMethod 26 public int divideByZero(); 24 27 25 28 } -
trunk/objectmq/test/calculatorTest/CalculatorImpl.java
r34 r38 3 3 import java.io.IOException; 4 4 5 import omq.client.annotation.AsyncMethod;6 5 import omq.common.broker.Broker; 7 6 import omq.exception.SerializerException; … … 17 16 private static final long serialVersionUID = 1L; 18 17 18 @Override 19 19 public int add(int x, int y) { 20 20 return x + y; 21 21 } 22 22 23 @Override 23 24 public void mult(int x, int y) { 24 25 mult = x * y; … … 33 34 } 34 35 35 public void divideByZero() throws IOException, SerializerException { 36 @Override 37 public void asyncDivideByZero() throws IOException, SerializerException { 36 38 ZeroEvent ze = new ZeroEvent("my zero event", "zero-event"); 37 39 Broker.trigger(ze); … … 40 42 41 43 @Override 42 @AsyncMethod43 44 public void sendMessage(Message m) { 44 45 System.out.println("Code = "+m.getCode()); 45 46 System.out.println("Message = "+m.getMessage()); 46 47 } 47 48 48 49 @Override 50 public int divideByZero() { 51 int x = 2 / 0; 52 return x; 53 } 49 54 50 55 } -
trunk/objectmq/test/calculatorTest/ClientTest.java
r35 r38 26 26 env.setProperty(ParameterQueue.SERVER_PORT, "5672"); 27 27 env.setProperty(ParameterQueue.DURABLE_QUEUES, "false"); 28 env.setProperty(ParameterQueue.SERIALIZER_NAME, Serializer. kryo);28 env.setProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java); 29 29 env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false"); 30 30 … … 80 80 remoteCalc.addListener(zL); 81 81 82 remoteCalc. divideByZero();82 remoteCalc.asyncDivideByZero(); 83 83 84 84 Thread.sleep(200); … … 90 90 remoteCalc.sendMessage(m); 91 91 } 92 93 @Test(expected = ArithmeticException.class) 94 public void divideByZero() { 95 remoteCalc.divideByZero(); 96 } 92 97 } -
trunk/objectmq/test/calculatorTest/ServerTest.java
r35 r38 20 20 env.setProperty(ParameterQueue.SERVER_PORT, "5672"); 21 21 env.setProperty(ParameterQueue.DURABLE_QUEUES, "true"); 22 env.setProperty(ParameterQueue.SERIALIZER_NAME, Serializer. kryo);22 env.setProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java); 23 23 env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false"); 24 24
Note: See TracChangeset
for help on using the changeset viewer.