Changeset 14


Ignore:
Timestamp:
05/20/13 15:35:46 (11 years ago)
Author:
stoda
Message:

Events added

Location:
trunk/objectmq/src/omq
Files:
4 added
10 edited
1 moved

Legend:

Unmodified
Added
Removed
  • trunk/objectmq/src/omq/Remote.java

    r9 r14  
    11package omq;
    22
     3import java.io.IOException;
    34import java.io.Serializable;
     5
     6import omq.common.event.Event;
     7import omq.exception.SerializerException;
    48
    59/**
     
    1620         */
    1721        public String getRef();
     22
     23        public void notifyEvent(Event event) throws IOException, SerializerException;
    1824}
  • trunk/objectmq/src/omq/client/proxy/Proxymq.java

    r10 r14  
    1414import omq.client.annotation.SyncMethod;
    1515import omq.client.remote.response.ResponseListener;
     16import omq.common.event.Event;
    1617import omq.common.message.Request;
    1718import omq.common.message.Response;
     
    267268        }
    268269
     270        @Override
     271        public void notifyEvent(Event event) throws IOException, SerializerException {
     272
     273        }
     274
    269275}
  • trunk/objectmq/src/omq/common/broker/Broker.java

    r11 r14  
    11package omq.common.broker;
    22
     3import java.io.IOException;
    34import java.util.Properties;
    45
     
    67import omq.client.proxy.Proxymq;
    78import omq.client.remote.response.ResponseListener;
    8 import omq.common.remote.OmqConnectionFactory;
    99import omq.common.util.Environment;
     10import omq.common.util.OmqConnectionFactory;
    1011import omq.exception.EnvironmentException;
    1112import omq.exception.RemoteException;
     
    2425                } catch (EnvironmentException ex) { // environment not set.
    2526                        Environment.setEnvironment(env);
    26                         connection = OmqConnectionFactory.getConnection(env);
     27                        connection = OmqConnectionFactory.getNewConnection(env);
    2728                        channel = connection.createChannel();
    2829                }
     
    3637        public static Channel getChannel() throws Exception {
    3738                return channel;
     39        }
     40
     41        public static Channel getNewChannel() throws IOException {
     42                return connection.createChannel();
    3843        }
    3944
  • trunk/objectmq/src/omq/common/remote/RemoteListener.java

    r9 r14  
    44import java.io.IOException;
    55import java.util.Properties;
     6
     7import omq.common.util.OmqConnectionFactory;
    68
    79import com.rabbitmq.client.Channel;
     
    1416 *
    1517 */
     18//TODO aquesta classe es pot eliminar
    1619public abstract class RemoteListener extends Thread {
    1720        private static String defaultXml = "eventListener.xml";
     
    3740
    3841        private void startConnection(Properties env) throws Exception {
    39                 connection = OmqConnectionFactory.getConnection(env);
     42                connection = OmqConnectionFactory.getNewConnection(env);
    4043                channel = connection.createChannel();
    4144        }
  • trunk/objectmq/src/omq/common/util/OmqConnectionFactory.java

    r13 r14  
    1 package omq.common.remote;
     1package omq.common.util;
    22
    33import java.io.IOException;
     
    66import java.util.Properties;
    77
    8 import omq.common.util.ParameterQueue;
    9 
    10 
     8import com.rabbitmq.client.Channel;
    119import com.rabbitmq.client.Connection;
    1210import com.rabbitmq.client.ConnectionFactory;
     
    1816 */
    1917public class OmqConnectionFactory {
    20         public static Connection getConnection(Properties env) throws IOException, KeyManagementException, NoSuchAlgorithmException {
     18        private static Connection connection;
     19
     20        public static void init(Properties env) throws KeyManagementException, NoSuchAlgorithmException, IOException {
     21                if (connection == null) {
     22                        connection = getNewConnection(env);
     23                }
     24        }
     25
     26        public static Connection getNewConnection(Properties env) throws IOException, KeyManagementException, NoSuchAlgorithmException {
    2127                // Get login info of rabbitmq
    2228                String username = env.getProperty(ParameterQueue.USER_NAME);
     
    4046                return factory.newConnection();
    4147        }
     48
     49        public static Channel getNewChannel() throws IOException {
     50                return connection.createChannel();
     51        }
    4252}
  • trunk/objectmq/src/omq/common/util/Serializer.java

    r11 r14  
    44import java.util.Properties;
    55
     6import omq.common.event.Event;
    67import omq.common.message.Request;
    78import omq.common.message.Response;
     
    1920        public static ISerializer serializer;
    2021
    21         private static Boolean getEnableCompression(){
     22        private static Boolean getEnableCompression() {
    2223                Boolean enableCompression = false;
    2324                try {
    2425                        Properties env = Environment.getEnvironment();
    2526                        enableCompression = Boolean.valueOf(env.getProperty(ParameterQueue.ENABLECOMPRESSION, "false"));
    26                 } catch (EnvironmentException e) { }
    27                
     27                } catch (EnvironmentException e) {
     28                }
     29
    2830                return enableCompression;
    2931        }
    30        
    31         public static ISerializer getInstance() throws SerializerException {           
     32
     33        public static ISerializer getInstance() throws SerializerException {
    3234                if (serializer == null) {
    33                         try{
    34                                 Properties env = Environment.getEnvironment();                         
     35                        try {
     36                                Properties env = Environment.getEnvironment();
    3537                                String className = env.getProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.JavaImp");
    36                                
     38
    3739                                serializer = (ISerializer) Class.forName(className).newInstance();
    38                         } catch (Exception ex){
     40                        } catch (Exception ex) {
    3941                                throw new SerializerException(ex.getMessage(), ex);
    4042                        }
    4143                }
    42                
     44
    4345                return serializer;
    4446        }
     
    4648        public static byte[] serialize(Object obj) throws SerializerException {
    4749                ISerializer instance = getInstance();
    48                
    49                 Boolean enableCompression = getEnableCompression();             
    50                 if(enableCompression){
     50
     51                Boolean enableCompression = getEnableCompression();
     52                if (enableCompression) {
    5153                        byte[] objSerialized = instance.serialize(obj);
    5254                        try {
     
    5456                        } catch (IOException e) {
    5557                                throw new SerializerException(e.getMessage(), e);
    56                         }                       
    57                 } else{
     58                        }
     59                } else {
    5860                        return instance.serialize(obj);
    59                 }               
     61                }
    6062        }
    6163
    6264        public static Request deserializeRequest(byte[] bytes, RemoteObject obj) throws SerializerException {
    6365                ISerializer instance = getInstance();
    64                                
     66
    6567                Boolean enableCompression = getEnableCompression();
    66                 if(enableCompression){
     68                if (enableCompression) {
    6769                        try {
    6870                                byte[] unZippedBytes = Zipper.unzip(bytes);
     
    7173                                throw new SerializerException(e.getMessage(), e);
    7274                        }
    73                 } else{
     75                } else {
    7476                        return instance.deserializeRequest(bytes, obj);
    75                 }                       
     77                }
    7678        }
    7779
    7880        public static Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException {
    7981                ISerializer instance = getInstance();
    80                
     82
    8183                Boolean enableCompression = getEnableCompression();
    82                 if(enableCompression){
     84                if (enableCompression) {
    8385                        try {
    8486                                byte[] unZippedBytes = Zipper.unzip(bytes);
     
    8789                                throw new SerializerException(e.getMessage(), e);
    8890                        }
    89                 } else{
     91                } else {
    9092                        return instance.deserializeResponse(bytes, type);
    91                 }                       
     93                }
     94        }
     95
     96        public static Event deserializeEvent(byte[] bytes) throws SerializerException {
     97                ISerializer instance = getInstance();
     98
     99                Boolean enableCompression = getEnableCompression();
     100                if (enableCompression) {
     101                        try {
     102                                byte[] unZippedBytes = Zipper.unzip(bytes);
     103                                return instance.deserializeEvent(unZippedBytes);
     104                        } catch (IOException e) {
     105                                throw new SerializerException(e.getMessage(), e);
     106                        }
     107                } else {
     108                        return instance.deserializeEvent(bytes);
     109                }
    92110        }
    93111}
  • trunk/objectmq/src/omq/common/util/Serializers/GsonImp.java

    r10 r14  
    33import java.util.List;
    44
     5import omq.common.event.Event;
    56import omq.common.message.Request;
    67import omq.common.message.Response;
     
    7172        }
    7273
     74        @Override
     75        public Event deserializeEvent(byte[] unZippedBytes) throws SerializerException {
     76                // TODO deserializeEvent class<?> ¿?
     77                return null;
     78        }
     79       
     80       
     81
    7382}
  • trunk/objectmq/src/omq/common/util/Serializers/ISerializer.java

    r9 r14  
    11package omq.common.util.Serializers;
    22
     3import omq.common.event.Event;
    34import omq.common.message.Request;
    45import omq.common.message.Response;
     
    1718
    1819        public Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException;
     20
     21        public Event deserializeEvent(byte[] bytes) throws SerializerException;
    1922}
  • trunk/objectmq/src/omq/common/util/Serializers/JavaImp.java

    r9 r14  
    66import java.io.ObjectOutputStream;
    77
     8import omq.common.event.Event;
    89import omq.common.message.Request;
    910import omq.common.message.Response;
     
    4344        @Override
    4445        public Request deserializeRequest(byte[] bytes, RemoteObject obj) throws SerializerException {
     46                return (Request) deserliazeObject(bytes);
     47        }
     48
     49        @Override
     50        public Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException {
     51                return (Response) deserliazeObject(bytes);
     52        }
     53
     54        @Override
     55        public Event deserializeEvent(byte[] bytes) throws SerializerException {
     56                return (Event) deserliazeObject(bytes);
     57        }
     58
     59        private Object deserliazeObject(byte[] bytes) throws SerializerException {
    4560                try {
    4661                        ByteArrayInputStream input = new ByteArrayInputStream(bytes);
    4762                        ObjectInputStream objInput = new ObjectInputStream(input);
    4863
    49                         Object request = objInput.readObject();
     64                        Object obj = objInput.readObject();
    5065
    5166                        objInput.close();
    5267                        input.close();
    5368
    54                         return (Request) request;
    55                 } catch (Exception e) {
    56                         throw new SerializerException("Deserialize -> " + e.getMessage(), e);
    57                 }
    58         }
    59 
    60         @Override
    61         public Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException {
    62                 try {
    63                         ByteArrayInputStream input = new ByteArrayInputStream(bytes);
    64                         ObjectInputStream objInput = new ObjectInputStream(input);
    65 
    66                         Object response = objInput.readObject();
    67 
    68                         objInput.close();
    69                         input.close();
    70 
    71                         return (Response) response;
     69                        return obj;
    7270                } catch (Exception e) {
    7371                        throw new SerializerException("Deserialize -> " + e.getMessage(), e);
  • trunk/objectmq/src/omq/common/util/Serializers/KryoImp.java

    r9 r14  
    77import com.esotericsoftware.kryo.io.Output;
    88
     9import omq.common.event.Event;
    910import omq.common.message.Request;
    1011import omq.common.message.Response;
     
    4344        @Override
    4445        public Request deserializeRequest(byte[] bytes, RemoteObject obj) throws SerializerException {
    45                 try {
    46                         Input input = new Input(bytes);
    47                         Request request = kryo.readObject(input, Request.class);
    48 
    49                         input.close();
    50                         return request;
    51                 } catch (Exception e) {
    52                         throw new SerializerException("Deserialize -> " + e.getMessage(), e);
    53                 }
     46                return (Request) deserializeObject(bytes, Request.class);
    5447        }
    5548
    5649        @Override
    5750        public Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException {
     51                return (Response) deserializeObject(bytes, Response.class);
     52        }
     53
     54        @Override
     55        public Event deserializeEvent(byte[] bytes) throws SerializerException {
     56                return (Event) deserializeObject(bytes, Event.class);
     57        }
     58
     59        private Object deserializeObject(byte[] bytes, Class<?> type) throws SerializerException {
    5860                try {
    5961                        Input input = new Input(bytes);
    60                         Response response = kryo.readObject(input, Response.class);
     62                        Object obj = kryo.readObject(input, type);
    6163
    6264                        input.close();
    63                         return response;
     65                        return obj;
    6466                } catch (Exception e) {
    6567                        throw new SerializerException("Deserialize -> " + e.getMessage(), e);
  • trunk/objectmq/src/omq/server/remote/request/RemoteObject.java

    r10 r14  
    1111import omq.Remote;
    1212import omq.common.broker.Broker;
     13import omq.common.event.Event;
    1314import omq.common.util.ParameterQueue;
     15import omq.common.util.Serializer;
    1416import omq.exception.SerializerException;
    1517
    1618import com.rabbitmq.client.Channel;
    17 import com.rabbitmq.client.Connection;
    1819import com.rabbitmq.client.ConsumerCancelledException;
    1920import com.rabbitmq.client.QueueingConsumer;
     
    3334        private transient RemoteWrapper remoteWrapper;
    3435        private transient Map<String, List<Class<?>>> params;
    35         private transient Connection connection;
    3636        private transient Channel channel;
    3737        private transient QueueingConsumer consumer;
     
    5151
    5252        public RemoteObject() {
     53        }
     54
     55        public void start(String reference, Properties env) throws Exception {
     56                this.UID = reference;
     57
    5358                params = new HashMap<String, List<Class<?>>>();
    5459                for (Method m : this.getClass().getMethods()) {
     
    5964                        params.put(m.getName(), list);
    6065                }
    61         }
    62 
    63         public void start(String reference, Properties env) throws Exception {
    64                 this.UID = reference;
    6566
    6667                // Get num threads to use
     
    7374                String routingKey = UID;
    7475
    75                 // Start connection and channel
    76                 connection = Broker.getConnection();
    77                 channel = connection.createChannel();
     76                // Start channel
     77                channel = Broker.getNewChannel();
    7878
    7979                // Declares and bindings
     
    118118        }
    119119
     120        @Override
     121        public void notifyEvent(Event event) throws IOException, SerializerException {
     122                event.setTopic(UID);
     123                channel.exchangeDeclare(UID, "fanout");
     124                channel.basicPublish(UID, "", null, Serializer.serialize(event));
     125        }
     126
    120127        public void kill() throws IOException {
    121128                interrupt();
    122129                killed = true;
    123130                channel.close();
    124                 connection.close();
    125131                remoteWrapper.stopRemoteWrapper();
    126132        }
Note: See TracChangeset for help on using the changeset viewer.