Changeset 6


Ignore:
Timestamp:
05/09/13 17:34:07 (11 years ago)
Author:
stoda
Message:

Objectmq working (there are no remote Exceptions, and you cannot send remoteObjects through the web, but you can make proxies)

Location:
objectmq/src/omq
Files:
1 added
5 deleted
12 edited
1 copied

Legend:

Unmodified
Added
Removed
  • objectmq/src/omq/client/remote/response/ResponseListener.java

    r4 r6  
    1111import omq.common.util.Serializer;
    1212import omq.exception.SerializerException;
    13 
    1413
    1514import com.rabbitmq.client.ConsumerCancelledException;
     
    124123        }
    125124
     125        public static boolean isVoid() {
     126                return rListener == null;
     127        }
     128
    126129        /**
    127130         * Method to retrieve the unique ResponseListener
  • objectmq/src/omq/common/broker/Broker.java

    r4 r6  
    55import omq.Remote;
    66import omq.client.proxy.Proxymq;
     7import omq.client.remote.response.ResponseListener;
     8import omq.common.event.EventDispatcher;
     9import omq.common.event.EventTrigger;
    710import omq.common.remote.OmqConnectionFactory;
    811import omq.exception.RemoteException;
    912import omq.server.remote.request.RemoteObject;
    1013
     14import com.rabbitmq.client.Channel;
    1115import com.rabbitmq.client.Connection;
    1216
    1317public class Broker {
    1418        private static Connection connection;
     19        private static Channel channel;
    1520        private static Properties environment;
    1621
    1722        public static void initBroker(Properties env) throws Exception {
    18                 if (environment != null) {
     23                if (environment == null) {
     24                        environment = env;
    1925                        connection = OmqConnectionFactory.getConnection(env);
     26                        channel = connection.createChannel();
     27                        EventTrigger.initEventTrigger(environment);
    2028                }
    2129        }
     
    2634        }
    2735
     36        public static Channel getChannel() throws Exception {
     37                return channel;
     38        }
     39
    2840        public static Remote lookup(String reference, Class<?> contract) throws RemoteException {
    2941                try {
    30                         if (Proxymq.containsProxy(reference)) {
    31                                 Proxymq proxy = new Proxymq(reference, contract, null);
     42                        if (ResponseListener.isVoid()) {
     43                                ResponseListener.init(environment);
     44                        }
     45                        if (EventDispatcher.isVoid()) {
     46                                EventDispatcher.init(environment);
     47                        }
     48                        if (!Proxymq.containsProxy(reference)) {
     49                                Proxymq proxy = new Proxymq(reference, contract, environment);
    3250                                Class<?>[] array = { contract };
    3351                                return (Remote) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
     
    4058        }
    4159
    42         public static void bind(String reference, Remote obj) throws RemoteException {
     60        public static void bind(String reference, RemoteObject remote) throws RemoteException {
    4361                try {
    44                         RemoteObject remote = (RemoteObject) obj;
    4562                        remote.start(reference, environment);
    4663                } catch (Exception e) {
  • objectmq/src/omq/common/event/EventDispatcher.java

    r4 r6  
    165165        }
    166166
     167        public static boolean isVoid() {
     168                return dispatcher == null;
     169        }
     170
    167171}
  • objectmq/src/omq/common/message/request/AsyncRequest.java

    r3 r6  
    66
    77import omq.Remote;
     8import omq.common.broker.Broker;
    89import omq.common.event.Event;
    910import omq.common.message.response.Response;
    1011import omq.common.util.ParameterQueue;
    1112import omq.common.util.Serializer;
    12 import omq.server.remote.request.RequestListener;
    13 
    1413
    1514import com.rabbitmq.client.AMQP.BasicProperties;
     
    3029        private String topic;
    3130
    32         public AsyncRequest() { 
     31        public AsyncRequest() {
    3332                super();
    3433        }
     
    6766                // published to everybody interested in.
    6867                if (topic != null) {
    69                         Channel channel = RequestListener.getRequestListener().getChannel();
     68                        Channel channel = Broker.getChannel();
    7069                        channel.exchangeDeclare(topic, "fanout");
    7170
     
    7372
    7473                        channel.basicPublish(topic, "", null, Serializer.serialize(event));
    75                         channel.close();
    7674                }
    7775        }
     
    8179                // Get the environment properties
    8280                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
    83                 String routingkey = env.getProperty(ParameterQueue.RPC_ROUTING_KEY);
     81                String routingkey = this.uid;
    8482
    8583                // Add the correlation ID and create a replyTo property
  • objectmq/src/omq/common/message/request/SyncRequest.java

    r3 r6  
    66
    77import omq.Remote;
     8import omq.common.broker.Broker;
    89import omq.common.message.response.Response;
    910import omq.common.util.ParameterQueue;
     
    1112import omq.exception.RetryException;
    1213import omq.exception.TimeoutException;
    13 import omq.server.remote.request.RequestListener;
    14 
    1514
    1615import com.rabbitmq.client.AMQP.BasicProperties;
     
    3029        private long timeout;
    3130        private int retries;
    32        
    33        
     31
    3432        public SyncRequest() {
    3533                super();
    3634        }
    37        
     35
    3836        /**
    3937         * This is the constructor of a synchronous request
     
    6260
    6361                // Send the response to the proxy
    64                 Channel channel = RequestListener.getRequestListener().getChannel();
     62                Channel channel = Broker.getChannel();
    6563
    6664                BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
    6765
    6866                channel.basicPublish("", props.getReplyTo(), replyProps, Serializer.serialize(resp));
    69 
    70                 channel.close();
    7167        }
    7268
     
    9187                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
    9288                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
    93                 String routingkey = env.getProperty(ParameterQueue.RPC_ROUTING_KEY);
     89                String routingkey = this.uid;
    9490
    9591                // Add the correlation ID and create a replyTo property
     
    122118                }
    123119
    124                 return resp.getResp(env);
     120                // TODO why I need env?
     121                return resp.getResp();
    125122        }
    126123
    127        
    128124        public long getTimeout() {
    129125                return timeout;
     
    141137                this.retries = retries;
    142138        }
    143        
     139
    144140}
  • objectmq/src/omq/common/message/response/Response.java

    r3 r6  
    1111 *
    1212 */
    13 public abstract class Response implements Serializable {
     13public class Response implements Serializable {
    1414        /**
    1515         *
     
    1717        private static final long serialVersionUID = 1L;
    1818
    19         protected String proxyUID;
     19        private String proxyUID;
     20        private Object obj;
    2021
    21         public Response() { }
    22 
    23         public Response(String proxyUID) {
    24                 this.proxyUID = proxyUID;
     22        public Response() {
    2523        }
    2624
    27         public abstract Object getResp(Properties env) throws Exception;
    28 
    29         public abstract Object getResp() throws Exception;
     25        public Response(String proxyUID, Object obj) {
     26                this.proxyUID = proxyUID;
     27                this.obj = obj;
     28        }
    3029
    3130        public String getProxyUID() {
     
    3736        }
    3837
     38        public Object getResp() {
     39                return obj;
     40        }
     41
     42        public void setObj(Object obj) {
     43                this.obj = obj;
     44        }
    3945}
  • objectmq/src/omq/common/util/Serializer.java

    r3 r6  
    11package omq.common.util;
    22
    3 import omq.common.util.Serializers.GsonImp;
    43import omq.common.util.Serializers.ISerializer;
    5 import omq.common.util.Serializers.JavaImp;
    6 import omq.common.util.Serializers.JsonImp;
    74import omq.common.util.Serializers.KryoImp;
    8 import omq.common.util.Serializers.XmlImp;
    9 import omq.common.util.Serializers.YamlImp;
    105import omq.exception.SerializerException;
    116
     
    2116        public static ISerializer getInstance() {
    2217                if (serializer == null) {
    23                         serializer = new JavaImp();//Working
    24                         //serializer = new KryoImp();//Working
    25                         //serializer = new YamlImp();//Working
    26                         //serializer = new JsonImp();
    27                         //serializer = new GsonImp();
    28                         //serializer = new XmlImp();//Working
     18                        // serializer = new JavaImp();//Working
     19                        serializer = new KryoImp();// Working
     20                        // serializer = new YamlImp();//Working
     21                        // serializer = new JsonImp();
     22                        // serializer = new GsonImp();
     23                        // serializer = new XmlImp();//Working
    2924                }
    3025                return serializer;
  • objectmq/src/omq/common/util/Serializers/GsonImp.java

    r3 r6  
    11package omq.common.util.Serializers;
    22
    3 import omq.common.message.request.AsyncRequest;
    4 import omq.common.message.request.SyncRequest;
    5 import omq.common.message.response.CollectionResponse;
    6 import omq.common.message.response.DefaultResponse;
    7 import omq.common.message.response.ExceptionResponse;
    8 import omq.common.message.response.ProxyResponse;
    9 import omq.common.message.response.Response;
    103import omq.exception.SerializerException;
    114
    125import com.google.gson.Gson;
    136
    14 
    157public class GsonImp implements ISerializer {
    168        private final Gson gson = new Gson();
    17        
     9
    1810        @Override
    1911        public byte[] serialize(Object obj) throws SerializerException {
     
    2113                return output.getBytes();
    2214        }
    23        
     15
    2416        @Override
    2517        public byte[] serialize(Object obj, Class<?> clazz) throws SerializerException {
    2618                return serialize(obj);
    27         }       
     19        }
    2820
    29         private Object tryDeserialize(byte[] bytes, Class<?> clazz) throws Exception{
     21        private Object tryDeserialize(byte[] bytes, Class<?> clazz) throws Exception {
    3022                String input = new String(bytes);
    3123                Object object = gson.fromJson(input, clazz);
    3224                return object;
    3325        }
    34        
     26
    3527        @Override
    3628        public Object deserialize(byte[] bytes) throws SerializerException {
    37                 try {
    38                         return tryDeserialize(bytes, SyncRequest.class);
    39                 } catch (Exception e) {
    40                         try {
    41                                 return tryDeserialize(bytes, AsyncRequest.class);
    42                         } catch (Exception e1) {                               
    43                                 try {
    44                                         return tryDeserialize(bytes, CollectionResponse.class);                                 
    45                                 } catch (Exception e2) {
    46                                         try {
    47                                                 return tryDeserialize(bytes, DefaultResponse.class);                                   
    48                                         } catch (Exception e3) {                                               
    49                                                 try {
    50                                                         return tryDeserialize(bytes, ExceptionResponse.class);                                 
    51                                                 } catch (Exception e4) {                                                       
    52                                                         try {
    53                                                                 return tryDeserialize(bytes, ProxyResponse.class);                                     
    54                                                         } catch (Exception e5) {                                                               
    55                                                                 try {
    56                                                                         return tryDeserialize(bytes, Response.class);                                   
    57                                                                 } catch (Exception e6) {
    58                                                                         throw new SerializerException("Deserialize -> " + e6.getMessage(), e6);
    59                                                                 }                                                               
    60                                                         }                                                       
    61                                                 }                                               
    62                                         }
    63                                 }                               
    64                         }
    65                 }               
     29                // try {
     30                // return tryDeserialize(bytes, SyncRequest.class);
     31                // } catch (Exception e) {
     32                // try {
     33                // return tryDeserialize(bytes, AsyncRequest.class);
     34                // } catch (Exception e1) {
     35                // try {
     36                // return tryDeserialize(bytes, CollectionResponse.class);
     37                // } catch (Exception e2) {
     38                // try {
     39                // return tryDeserialize(bytes, DefaultResponse.class);
     40                // } catch (Exception e3) {
     41                // try {
     42                // return tryDeserialize(bytes, ExceptionResponse.class);
     43                // } catch (Exception e4) {
     44                // try {
     45                // return tryDeserialize(bytes, ProxyResponse.class);
     46                // } catch (Exception e5) {
     47                // try {
     48                // return tryDeserialize(bytes, Response.class);
     49                // } catch (Exception e6) {
     50                // throw new SerializerException("Deserialize -> " + e6.getMessage(),
     51                // e6);
     52                // }
     53                // }
     54                // }
     55                // }
     56                // }
     57                // }
     58                // }
     59                return null;
    6660        }
    6761
  • objectmq/src/omq/common/util/Serializers/JsonImp.java

    r3 r6  
    11package omq.common.util.Serializers;
    22
    3 import omq.common.message.request.AsyncRequest;
    4 import omq.common.message.request.SyncRequest;
    5 import omq.common.message.response.CollectionResponse;
    6 import omq.common.message.response.DefaultResponse;
    7 import omq.common.message.response.ExceptionResponse;
    8 import omq.common.message.response.ProxyResponse;
    9 import omq.common.message.response.Response;
    103import omq.exception.SerializerException;
    114
     
    2013public class JsonImp implements ISerializer {
    2114        private final ObjectMapper mapper = new ObjectMapper();
    22        
     15
    2316        @Override
    2417        public byte[] serialize(Object obj) throws SerializerException {
     
    3023                }
    3124        }
    32        
     25
    3326        @Override
    3427        public byte[] serialize(Object obj, Class<?> clazz) throws SerializerException {
    3528                return serialize(obj);
    36         }       
    37        
    38        
    39         private Object tryDeserialize(byte[] bytes, Class<?> clazz) throws Exception{
     29        }
     30
     31        private Object tryDeserialize(byte[] bytes, Class<?> clazz) throws Exception {
    4032                Object object = mapper.readValue(bytes, clazz);
    4133                return object;
    4234        }
    43        
    4435
    4536        @Override
    4637        public Object deserialize(byte[] bytes) throws SerializerException {
    47                 try {
    48                         return tryDeserialize(bytes, SyncRequest.class);
    49                 } catch (Exception e) {
    50                         try {
    51                                 return tryDeserialize(bytes, AsyncRequest.class);
    52                         } catch (Exception e1) {                               
    53                                 try {
    54                                         return tryDeserialize(bytes, CollectionResponse.class);                                 
    55                                 } catch (Exception e2) {
    56                                         try {
    57                                                 return tryDeserialize(bytes, DefaultResponse.class);                                   
    58                                         } catch (Exception e3) {                                               
    59                                                 try {
    60                                                         return tryDeserialize(bytes, ExceptionResponse.class);                                 
    61                                                 } catch (Exception e4) {                                                       
    62                                                         try {
    63                                                                 return tryDeserialize(bytes, ProxyResponse.class);                                     
    64                                                         } catch (Exception e5) {                                                               
    65                                                                 try {
    66                                                                         return tryDeserialize(bytes, Response.class);                                   
    67                                                                 } catch (Exception e6) {
    68                                                                         throw new SerializerException("Deserialize -> " + e6.getMessage(), e6);
    69                                                                 }                                                               
    70                                                         }                                                       
    71                                                 }                                               
    72                                         }
    73                                 }                               
    74                         }
    75                 }
     38                // try {
     39                // return tryDeserialize(bytes, SyncRequest.class);
     40                // } catch (Exception e) {
     41                // try {
     42                // return tryDeserialize(bytes, AsyncRequest.class);
     43                // } catch (Exception e1) {
     44                // try {
     45                // return tryDeserialize(bytes, CollectionResponse.class);
     46                // } catch (Exception e2) {
     47                // try {
     48                // return tryDeserialize(bytes, DefaultResponse.class);
     49                // } catch (Exception e3) {
     50                // try {
     51                // return tryDeserialize(bytes, ExceptionResponse.class);
     52                // } catch (Exception e4) {
     53                // try {
     54                // return tryDeserialize(bytes, ProxyResponse.class);
     55                // } catch (Exception e5) {
     56                // try {
     57                // return tryDeserialize(bytes, Response.class);
     58                // } catch (Exception e6) {
     59                // throw new SerializerException("Deserialize -> " + e6.getMessage(),
     60                // e6);
     61                // }
     62                // }
     63                // }
     64                // }
     65                // }
     66                // }
     67                // }
     68                return null;
    7669        }
    7770
  • objectmq/src/omq/server/remote/request/RemoteObject.java

    r4 r6  
    1414import omq.common.event.EventListener;
    1515import omq.common.event.EventTrigger;
    16 import omq.common.message.response.DefaultResponse;
    17 import omq.common.message.response.ExceptionResponse;
    1816import omq.common.message.response.Response;
    1917import omq.common.util.ParameterQueue;
     
    3735
    3836        private String UID;
    39         private RemoteWrapper remoteWrapper;
    40         private Connection connection;
    41         private Channel channel;
    42         private QueueingConsumer consumer;
    43         private boolean killed = false;
     37        private transient RemoteWrapper remoteWrapper;
     38        private transient Connection connection;
     39        private transient Channel channel;
     40        private transient QueueingConsumer consumer;
     41        private transient boolean killed = false;
    4442
    4543        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
     
    5957
    6058                // Get num threads to use
    61                 int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS));
     59                int numThreads = 4;//Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS));
    6260                remoteWrapper = new RemoteWrapper(this, numThreads);
    6361
     
    9088        public void run() {
    9189                while (!killed) {
    92                         try {
     90                        try {System.out.println("Consuming ...");
    9391                                Delivery delivery = consumer.nextDelivery();
     92                                System.out.println("Consuming a delivery");
    9493                                remoteWrapper.notifyDelivery(delivery);
    9594                        } catch (InterruptedException i) {
     
    135134                try {
    136135                        Object result = method.invoke(this, arguments);
    137                         return new DefaultResponse(this.getRef(), result);
     136                        return new Response(this.getRef(), result);
    138137                } catch (InvocationTargetException e) {
    139                         return new ExceptionResponse(this.getRef(), e.getTargetException());
     138                        return new Response(this.getRef(), e.getTargetException());
    140139                }
    141140        }
  • objectmq/src/omq/ztest/Serializers/Car.java

    r3 r6  
    1 package omq.z.test.Serializers;
     1package omq.ztest.Serializers;
    22
    33import java.io.Serializable;
  • objectmq/src/omq/ztest/Serializers/Test.java

    r3 r6  
    1 package omq.z.test.Serializers;
     1package omq.ztest.Serializers;
    22
    33import omq.common.util.Serializers.GsonImp;
     
    99import omq.common.util.Serializers.YamlImp;
    1010import omq.exception.SerializerException;
    11 import omq.z.test.Serializers.Car.Trademark;
     11import omq.ztest.Serializers.Car.Trademark;
    1212
    1313/**
Note: See TracChangeset for help on using the changeset viewer.