Changeset 6
- Timestamp:
- 05/09/13 17:34:07 (12 years ago)
- Location:
- objectmq/src/omq
- Files:
-
- 1 added
- 5 deleted
- 12 edited
- 1 copied
Legend:
- Unmodified
- Added
- Removed
-
objectmq/src/omq/client/remote/response/ResponseListener.java
r4 r6 11 11 import omq.common.util.Serializer; 12 12 import omq.exception.SerializerException; 13 14 13 15 14 import com.rabbitmq.client.ConsumerCancelledException; … … 124 123 } 125 124 125 public static boolean isVoid() { 126 return rListener == null; 127 } 128 126 129 /** 127 130 * Method to retrieve the unique ResponseListener -
objectmq/src/omq/common/broker/Broker.java
r4 r6 5 5 import omq.Remote; 6 6 import omq.client.proxy.Proxymq; 7 import omq.client.remote.response.ResponseListener; 8 import omq.common.event.EventDispatcher; 9 import omq.common.event.EventTrigger; 7 10 import omq.common.remote.OmqConnectionFactory; 8 11 import omq.exception.RemoteException; 9 12 import omq.server.remote.request.RemoteObject; 10 13 14 import com.rabbitmq.client.Channel; 11 15 import com.rabbitmq.client.Connection; 12 16 13 17 public class Broker { 14 18 private static Connection connection; 19 private static Channel channel; 15 20 private static Properties environment; 16 21 17 22 public static void initBroker(Properties env) throws Exception { 18 if (environment != null) { 23 if (environment == null) { 24 environment = env; 19 25 connection = OmqConnectionFactory.getConnection(env); 26 channel = connection.createChannel(); 27 EventTrigger.initEventTrigger(environment); 20 28 } 21 29 } … … 26 34 } 27 35 36 public static Channel getChannel() throws Exception { 37 return channel; 38 } 39 28 40 public static Remote lookup(String reference, Class<?> contract) throws RemoteException { 29 41 try { 30 if (Proxymq.containsProxy(reference)) { 31 Proxymq proxy = new Proxymq(reference, contract, null); 42 if (ResponseListener.isVoid()) { 43 ResponseListener.init(environment); 44 } 45 if (EventDispatcher.isVoid()) { 46 EventDispatcher.init(environment); 47 } 48 if (!Proxymq.containsProxy(reference)) { 49 Proxymq proxy = new Proxymq(reference, contract, environment); 32 50 Class<?>[] array = { contract }; 33 51 return (Remote) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy); … … 40 58 } 41 59 42 public static void bind(String reference, Remote obj) throws RemoteException {60 public static void bind(String reference, RemoteObject remote) throws RemoteException { 43 61 try { 44 RemoteObject remote = (RemoteObject) obj;45 62 remote.start(reference, environment); 46 63 } catch (Exception e) { -
objectmq/src/omq/common/event/EventDispatcher.java
r4 r6 165 165 } 166 166 167 public static boolean isVoid() { 168 return dispatcher == null; 169 } 170 167 171 } -
objectmq/src/omq/common/message/request/AsyncRequest.java
r3 r6 6 6 7 7 import omq.Remote; 8 import omq.common.broker.Broker; 8 9 import omq.common.event.Event; 9 10 import omq.common.message.response.Response; 10 11 import omq.common.util.ParameterQueue; 11 12 import omq.common.util.Serializer; 12 import omq.server.remote.request.RequestListener;13 14 13 15 14 import com.rabbitmq.client.AMQP.BasicProperties; … … 30 29 private String topic; 31 30 32 public AsyncRequest() { 31 public AsyncRequest() { 33 32 super(); 34 33 } … … 67 66 // published to everybody interested in. 68 67 if (topic != null) { 69 Channel channel = RequestListener.getRequestListener().getChannel();68 Channel channel = Broker.getChannel(); 70 69 channel.exchangeDeclare(topic, "fanout"); 71 70 … … 73 72 74 73 channel.basicPublish(topic, "", null, Serializer.serialize(event)); 75 channel.close();76 74 } 77 75 } … … 81 79 // Get the environment properties 82 80 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE); 83 String routingkey = env.getProperty(ParameterQueue.RPC_ROUTING_KEY);81 String routingkey = this.uid; 84 82 85 83 // Add the correlation ID and create a replyTo property -
objectmq/src/omq/common/message/request/SyncRequest.java
r3 r6 6 6 7 7 import omq.Remote; 8 import omq.common.broker.Broker; 8 9 import omq.common.message.response.Response; 9 10 import omq.common.util.ParameterQueue; … … 11 12 import omq.exception.RetryException; 12 13 import omq.exception.TimeoutException; 13 import omq.server.remote.request.RequestListener;14 15 14 16 15 import com.rabbitmq.client.AMQP.BasicProperties; … … 30 29 private long timeout; 31 30 private int retries; 32 33 31 34 32 public SyncRequest() { 35 33 super(); 36 34 } 37 35 38 36 /** 39 37 * This is the constructor of a synchronous request … … 62 60 63 61 // Send the response to the proxy 64 Channel channel = RequestListener.getRequestListener().getChannel();62 Channel channel = Broker.getChannel(); 65 63 66 64 BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build(); 67 65 68 66 channel.basicPublish("", props.getReplyTo(), replyProps, Serializer.serialize(resp)); 69 70 channel.close();71 67 } 72 68 … … 91 87 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE); 92 88 String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE); 93 String routingkey = env.getProperty(ParameterQueue.RPC_ROUTING_KEY);89 String routingkey = this.uid; 94 90 95 91 // Add the correlation ID and create a replyTo property … … 122 118 } 123 119 124 return resp.getResp(env); 120 // TODO why I need env? 121 return resp.getResp(); 125 122 } 126 123 127 128 124 public long getTimeout() { 129 125 return timeout; … … 141 137 this.retries = retries; 142 138 } 143 139 144 140 } -
objectmq/src/omq/common/message/response/Response.java
r3 r6 11 11 * 12 12 */ 13 public abstractclass Response implements Serializable {13 public class Response implements Serializable { 14 14 /** 15 15 * … … 17 17 private static final long serialVersionUID = 1L; 18 18 19 protected String proxyUID; 19 private String proxyUID; 20 private Object obj; 20 21 21 public Response() { } 22 23 public Response(String proxyUID) { 24 this.proxyUID = proxyUID; 22 public Response() { 25 23 } 26 24 27 public abstract Object getResp(Properties env) throws Exception; 28 29 public abstract Object getResp() throws Exception; 25 public Response(String proxyUID, Object obj) { 26 this.proxyUID = proxyUID; 27 this.obj = obj; 28 } 30 29 31 30 public String getProxyUID() { … … 37 36 } 38 37 38 public Object getResp() { 39 return obj; 40 } 41 42 public void setObj(Object obj) { 43 this.obj = obj; 44 } 39 45 } -
objectmq/src/omq/common/util/Serializer.java
r3 r6 1 1 package omq.common.util; 2 2 3 import omq.common.util.Serializers.GsonImp;4 3 import omq.common.util.Serializers.ISerializer; 5 import omq.common.util.Serializers.JavaImp;6 import omq.common.util.Serializers.JsonImp;7 4 import omq.common.util.Serializers.KryoImp; 8 import omq.common.util.Serializers.XmlImp;9 import omq.common.util.Serializers.YamlImp;10 5 import omq.exception.SerializerException; 11 6 … … 21 16 public static ISerializer getInstance() { 22 17 if (serializer == null) { 23 serializer = new JavaImp();//Working24 //serializer = new KryoImp();//Working25 // serializer = new YamlImp();//Working26 // serializer = new JsonImp();27 // serializer = new GsonImp();28 // serializer = new XmlImp();//Working18 // serializer = new JavaImp();//Working 19 serializer = new KryoImp();// Working 20 // serializer = new YamlImp();//Working 21 // serializer = new JsonImp(); 22 // serializer = new GsonImp(); 23 // serializer = new XmlImp();//Working 29 24 } 30 25 return serializer; -
objectmq/src/omq/common/util/Serializers/GsonImp.java
r3 r6 1 1 package omq.common.util.Serializers; 2 2 3 import omq.common.message.request.AsyncRequest;4 import omq.common.message.request.SyncRequest;5 import omq.common.message.response.CollectionResponse;6 import omq.common.message.response.DefaultResponse;7 import omq.common.message.response.ExceptionResponse;8 import omq.common.message.response.ProxyResponse;9 import omq.common.message.response.Response;10 3 import omq.exception.SerializerException; 11 4 12 5 import com.google.gson.Gson; 13 6 14 15 7 public class GsonImp implements ISerializer { 16 8 private final Gson gson = new Gson(); 17 9 18 10 @Override 19 11 public byte[] serialize(Object obj) throws SerializerException { … … 21 13 return output.getBytes(); 22 14 } 23 15 24 16 @Override 25 17 public byte[] serialize(Object obj, Class<?> clazz) throws SerializerException { 26 18 return serialize(obj); 27 } 19 } 28 20 29 private Object tryDeserialize(byte[] bytes, Class<?> clazz) throws Exception {21 private Object tryDeserialize(byte[] bytes, Class<?> clazz) throws Exception { 30 22 String input = new String(bytes); 31 23 Object object = gson.fromJson(input, clazz); 32 24 return object; 33 25 } 34 26 35 27 @Override 36 28 public Object deserialize(byte[] bytes) throws SerializerException { 37 try { 38 return tryDeserialize(bytes, SyncRequest.class); 39 } catch (Exception e) { 40 try { 41 return tryDeserialize(bytes, AsyncRequest.class); 42 } catch (Exception e1) { 43 try { 44 return tryDeserialize(bytes, CollectionResponse.class); 45 } catch (Exception e2) { 46 try { 47 return tryDeserialize(bytes, DefaultResponse.class); 48 } catch (Exception e3) { 49 try { 50 return tryDeserialize(bytes, ExceptionResponse.class); 51 } catch (Exception e4) { 52 try { 53 return tryDeserialize(bytes, ProxyResponse.class); 54 } catch (Exception e5) { 55 try { 56 return tryDeserialize(bytes, Response.class); 57 } catch (Exception e6) { 58 throw new SerializerException("Deserialize -> " + e6.getMessage(), e6); 59 } 60 } 61 } 62 } 63 } 64 } 65 } 29 // try { 30 // return tryDeserialize(bytes, SyncRequest.class); 31 // } catch (Exception e) { 32 // try { 33 // return tryDeserialize(bytes, AsyncRequest.class); 34 // } catch (Exception e1) { 35 // try { 36 // return tryDeserialize(bytes, CollectionResponse.class); 37 // } catch (Exception e2) { 38 // try { 39 // return tryDeserialize(bytes, DefaultResponse.class); 40 // } catch (Exception e3) { 41 // try { 42 // return tryDeserialize(bytes, ExceptionResponse.class); 43 // } catch (Exception e4) { 44 // try { 45 // return tryDeserialize(bytes, ProxyResponse.class); 46 // } catch (Exception e5) { 47 // try { 48 // return tryDeserialize(bytes, Response.class); 49 // } catch (Exception e6) { 50 // throw new SerializerException("Deserialize -> " + e6.getMessage(), 51 // e6); 52 // } 53 // } 54 // } 55 // } 56 // } 57 // } 58 // } 59 return null; 66 60 } 67 61 -
objectmq/src/omq/common/util/Serializers/JsonImp.java
r3 r6 1 1 package omq.common.util.Serializers; 2 2 3 import omq.common.message.request.AsyncRequest;4 import omq.common.message.request.SyncRequest;5 import omq.common.message.response.CollectionResponse;6 import omq.common.message.response.DefaultResponse;7 import omq.common.message.response.ExceptionResponse;8 import omq.common.message.response.ProxyResponse;9 import omq.common.message.response.Response;10 3 import omq.exception.SerializerException; 11 4 … … 20 13 public class JsonImp implements ISerializer { 21 14 private final ObjectMapper mapper = new ObjectMapper(); 22 15 23 16 @Override 24 17 public byte[] serialize(Object obj) throws SerializerException { … … 30 23 } 31 24 } 32 25 33 26 @Override 34 27 public byte[] serialize(Object obj, Class<?> clazz) throws SerializerException { 35 28 return serialize(obj); 36 } 37 38 39 private Object tryDeserialize(byte[] bytes, Class<?> clazz) throws Exception{ 29 } 30 31 private Object tryDeserialize(byte[] bytes, Class<?> clazz) throws Exception { 40 32 Object object = mapper.readValue(bytes, clazz); 41 33 return object; 42 34 } 43 44 35 45 36 @Override 46 37 public Object deserialize(byte[] bytes) throws SerializerException { 47 try { 48 return tryDeserialize(bytes, SyncRequest.class); 49 } catch (Exception e) { 50 try { 51 return tryDeserialize(bytes, AsyncRequest.class); 52 } catch (Exception e1) { 53 try { 54 return tryDeserialize(bytes, CollectionResponse.class); 55 } catch (Exception e2) { 56 try { 57 return tryDeserialize(bytes, DefaultResponse.class); 58 } catch (Exception e3) { 59 try { 60 return tryDeserialize(bytes, ExceptionResponse.class); 61 } catch (Exception e4) { 62 try { 63 return tryDeserialize(bytes, ProxyResponse.class); 64 } catch (Exception e5) { 65 try { 66 return tryDeserialize(bytes, Response.class); 67 } catch (Exception e6) { 68 throw new SerializerException("Deserialize -> " + e6.getMessage(), e6); 69 } 70 } 71 } 72 } 73 } 74 } 75 } 38 // try { 39 // return tryDeserialize(bytes, SyncRequest.class); 40 // } catch (Exception e) { 41 // try { 42 // return tryDeserialize(bytes, AsyncRequest.class); 43 // } catch (Exception e1) { 44 // try { 45 // return tryDeserialize(bytes, CollectionResponse.class); 46 // } catch (Exception e2) { 47 // try { 48 // return tryDeserialize(bytes, DefaultResponse.class); 49 // } catch (Exception e3) { 50 // try { 51 // return tryDeserialize(bytes, ExceptionResponse.class); 52 // } catch (Exception e4) { 53 // try { 54 // return tryDeserialize(bytes, ProxyResponse.class); 55 // } catch (Exception e5) { 56 // try { 57 // return tryDeserialize(bytes, Response.class); 58 // } catch (Exception e6) { 59 // throw new SerializerException("Deserialize -> " + e6.getMessage(), 60 // e6); 61 // } 62 // } 63 // } 64 // } 65 // } 66 // } 67 // } 68 return null; 76 69 } 77 70 -
objectmq/src/omq/server/remote/request/RemoteObject.java
r4 r6 14 14 import omq.common.event.EventListener; 15 15 import omq.common.event.EventTrigger; 16 import omq.common.message.response.DefaultResponse;17 import omq.common.message.response.ExceptionResponse;18 16 import omq.common.message.response.Response; 19 17 import omq.common.util.ParameterQueue; … … 37 35 38 36 private String UID; 39 private RemoteWrapper remoteWrapper;40 private Connection connection;41 private Channel channel;42 private QueueingConsumer consumer;43 private boolean killed = false;37 private transient RemoteWrapper remoteWrapper; 38 private transient Connection connection; 39 private transient Channel channel; 40 private transient QueueingConsumer consumer; 41 private transient boolean killed = false; 44 42 45 43 private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>(); … … 59 57 60 58 // Get num threads to use 61 int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS));59 int numThreads = 4;//Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS)); 62 60 remoteWrapper = new RemoteWrapper(this, numThreads); 63 61 … … 90 88 public void run() { 91 89 while (!killed) { 92 try { 90 try {System.out.println("Consuming ..."); 93 91 Delivery delivery = consumer.nextDelivery(); 92 System.out.println("Consuming a delivery"); 94 93 remoteWrapper.notifyDelivery(delivery); 95 94 } catch (InterruptedException i) { … … 135 134 try { 136 135 Object result = method.invoke(this, arguments); 137 return new DefaultResponse(this.getRef(), result);136 return new Response(this.getRef(), result); 138 137 } catch (InvocationTargetException e) { 139 return new ExceptionResponse(this.getRef(), e.getTargetException());138 return new Response(this.getRef(), e.getTargetException()); 140 139 } 141 140 } -
objectmq/src/omq/ztest/Serializers/Car.java
r3 r6 1 package omq.z .test.Serializers;1 package omq.ztest.Serializers; 2 2 3 3 import java.io.Serializable; -
objectmq/src/omq/ztest/Serializers/Test.java
r3 r6 1 package omq.z .test.Serializers;1 package omq.ztest.Serializers; 2 2 3 3 import omq.common.util.Serializers.GsonImp; … … 9 9 import omq.common.util.Serializers.YamlImp; 10 10 import omq.exception.SerializerException; 11 import omq.z .test.Serializers.Car.Trademark;11 import omq.ztest.Serializers.Car.Trademark; 12 12 13 13 /**
Note: See TracChangeset
for help on using the changeset viewer.