Changeset 72 for trunk/src/main


Ignore:
Timestamp:
06/29/13 20:44:27 (11 years ago)
Author:
stoda
Message:

Events deleted instead of them there's a new example of how to use the observer pattern

Location:
trunk/src/main/java/omq
Files:
1 deleted
9 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/Remote.java

    r44 r72  
    11package omq;
    22
    3 import java.io.IOException;
    43import java.io.Serializable;
    5 import java.util.Collection;
    6 
    7 import omq.common.event.Event;
    8 import omq.common.event.EventListener;
    9 import omq.exception.SerializerException;
    104
    115/**
     
    2216         */
    2317        public String getRef();
    24 
    25         public void notifyEvent(Event event) throws IOException, SerializerException;
    26 
    27         public void addListener(EventListener<?> eventListener) throws Exception;
    28 
    29         public void removeListener(EventListener<?> eventListener) throws Exception;
    30 
    31         public Collection<EventListener<?>> getListeners() throws Exception;
    3218}
  • trunk/src/main/java/omq/client/proxy/Proxymq.java

    r70 r72  
    11package omq.client.proxy;
    22
    3 import java.io.IOException;
    43import java.lang.reflect.Array;
    54import java.lang.reflect.InvocationHandler;
    65import java.lang.reflect.Method;
    7 import java.util.Collection;
    86import java.util.HashMap;
    97import java.util.Map;
     
    1614import omq.client.listener.ResponseListener;
    1715import omq.common.broker.Broker;
    18 import omq.common.event.Event;
    19 import omq.common.event.EventDispatcher;
    20 import omq.common.event.EventListener;
    2116import omq.common.message.Request;
    2217import omq.common.message.Response;
     
    2520import omq.exception.OmqException;
    2621import omq.exception.RetryException;
    27 import omq.exception.SerializerException;
    2822import omq.exception.TimeoutException;
    2923
     
    5549        private transient Broker broker;
    5650        private transient ResponseListener rListener;
    57         private transient EventDispatcher dispatcher;
    5851        private transient Serializer serializer;
    5952        private transient Properties env;
    6053        private transient Map<String, byte[]> results;
    61         private transient Map<String, EventListener<?>> listeners;
    6254
    6355        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
     
    9284                this.broker = broker;
    9385                rListener = broker.getResponseListener();
    94                 dispatcher = broker.getEventDispatcher();
    9586                serializer = broker.getSerializer();
    9687
     
    10596                serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA);
    10697
    107                 listeners = new HashMap<String, EventListener<?>>();
    108 
    10998                // Create a new hashmap and registry it in rListener
    11099                results = new HashMap<String, byte[]>();
     
    121110                        if (methodName.equals("getRef")) {
    122111                                return getRef();
    123                         } else if (methodName.equals("addListener")) {
    124                                 addListener((EventListener<?>) arguments[0]);
    125                                 return null;
    126                         } else if (methodName.equals("removeListener")) {
    127                                 removeListener((EventListener<?>) arguments[0]);
    128                                 return null;
    129                         } else if (methodName.equals("getListeners")) {
    130                                 return getListeners();
    131112                        }
    132113                }
     
    306287        }
    307288
    308         @Override
    309         public void notifyEvent(Event event) throws IOException, SerializerException {
    310         }
    311 
    312         @Override
    313         public void addListener(EventListener<?> eventListener) throws Exception {
    314                 if (eventListener.getTopic() == null) {
    315                         eventListener.setTopic(uid);
    316                 }
    317                 listeners.put(eventListener.getTopic(), eventListener);
    318                 dispatcher.addListener(eventListener);
    319         }
    320 
    321         @Override
    322         public void removeListener(EventListener<?> eventListener) throws Exception {
    323                 listeners.remove(eventListener.getTopic());
    324                 dispatcher.removeListener(eventListener);
    325         }
    326 
    327         @Override
    328         public Collection<EventListener<?>> getListeners() throws Exception {
    329                 return listeners.values();
    330         }
    331 
    332289}
  • trunk/src/main/java/omq/common/broker/Broker.java

    r70 r72  
    1313import omq.client.proxy.MultiProxymq;
    1414import omq.client.proxy.Proxymq;
    15 import omq.common.event.Event;
    16 import omq.common.event.EventDispatcher;
    17 import omq.common.event.EventWrapper;
    1815import omq.common.util.OmqConnectionFactory;
    1916import omq.common.util.ParameterQueue;
     
    2118import omq.exception.InitBrokerException;
    2219import omq.exception.RemoteException;
    23 import omq.exception.SerializerException;
    2420import omq.server.RemoteObject;
    2521
     
    4137        private Channel channel;
    4238        private ResponseListener responseListener;
    43         private EventDispatcher eventDispatcher;
    4439        private Serializer serializer;
    4540        private boolean clientStarted = false;
     
    7570                if (clientStarted) {
    7671                        responseListener.kill();
    77                         eventDispatcher.kill();
    7872                        // TODO proxies = null; ??
    7973                }
     
    211205                        responseListener.start();
    212206                }
    213                 if (eventDispatcher == null) {
    214                         eventDispatcher = new EventDispatcher(this);
    215                         eventDispatcher.start();
    216                 }
    217         }
    218 
    219         /**
    220          * This method sends an event with its information
    221          *
    222          * @param event
    223          * @throws IOException
    224          * @throws SerializerException
    225          */
    226         public void trigger(Event event) throws IOException, SerializerException {
    227                 String UID = event.getTopic();
    228                 EventWrapper wrapper = new EventWrapper(event);
    229                 logger.debug("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());
    230                 channel.exchangeDeclare(UID, "fanout");
    231 
    232                 byte[] bytesResponse = serializer.serialize(wrapper);
    233                 channel.basicPublish(UID, "", null, bytesResponse);
    234207        }
    235208
     
    318291        }
    319292
    320         public EventDispatcher getEventDispatcher() {
    321                 return eventDispatcher;
    322         }
    323 
    324293        public Serializer getSerializer() {
    325294                return serializer;
  • trunk/src/main/java/omq/common/util/Serializer.java

    r62 r72  
    44import java.util.Properties;
    55
    6 import omq.common.event.Event;
    76import omq.common.message.Request;
    87import omq.common.message.Response;
     
    4847                        try {
    4948                                String className = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA);
    50                                
     49
    5150                                if (className == null || className.isEmpty()) {
    5251                                        throw new ClassNotFoundException("Class name is null or empty.");
    5352                                }
    54                                
     53
    5554                                serializer = getInstance(className);
    5655                        } catch (Exception ex) {
     
    6261        }
    6362
    64         public ISerializer getInstance(String type) throws SerializerException {                               
     63        public ISerializer getInstance(String type) throws SerializerException {
    6564                if (KRYO.equals(type)) {
    6665                        if (kryoSerializer == null) {
     
    7978                        return javaSerializer;
    8079                }
    81                
     80
    8281                throw new SerializerException("Serializer not found.");
    8382        }
     
    148147        }
    149148
    150         public Event deserializeEvent(byte[] bytes) throws SerializerException {
    151                 ISerializer instance = getInstance();
    152 
    153                 Boolean enableCompression = getEnableCompression();
    154                 if (enableCompression) {
    155                         try {
    156                                 byte[] unZippedBytes = Zipper.unzip(bytes);
    157                                 return instance.deserializeEvent(unZippedBytes);
    158                         } catch (IOException e) {
    159                                 throw new SerializerException(e.getMessage(), e);
    160                         }
    161                 } else {
    162                         return instance.deserializeEvent(bytes);
    163                 }
    164         }
    165 
    166         // public static void removeSerializers() {
    167         // logger.warn("Removing serializers");
    168         // serializer = null;
    169         // kryoSerializer = null;
    170         // javaSerializer = null;
    171         // gsonSerializer = null;
    172         // }
    173149}
  • trunk/src/main/java/omq/common/util/Serializers/GsonImp.java

    r50 r72  
    33import java.util.List;
    44
    5 import omq.common.event.Event;
    65import omq.common.message.Request;
    76import omq.common.message.Response;
     
    7574        }
    7675
    77         @Override
    78         public Event deserializeEvent(byte[] bytes) throws SerializerException {
    79                 try {
    80                         String json = new String(bytes);
    81 
    82                         JsonParser parser = new JsonParser();
    83                         JsonObject jsonObj = parser.parse(json).getAsJsonObject();
    84 
    85                         String type = jsonObj.get("type").getAsString();
    86 
    87                         JsonElement jsonElement = jsonObj.get("event");
    88                         Event event;
    89 
    90                         event = (Event) gson.fromJson(jsonElement, Class.forName(type));
    91 
    92                         return event;
    93                 } catch (Exception e) {
    94                         throw new SerializerException("Deserialize event", e.getCause());
    95                 }
    96         }
    97 
    9876}
  • trunk/src/main/java/omq/common/util/Serializers/ISerializer.java

    r44 r72  
    11package omq.common.util.Serializers;
    22
    3 import omq.common.event.Event;
    43import omq.common.message.Request;
    54import omq.common.message.Response;
     
    1817
    1918        public Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException;
    20 
    21         public Event deserializeEvent(byte[] bytes) throws SerializerException;
    2219}
  • trunk/src/main/java/omq/common/util/Serializers/JavaImp.java

    r44 r72  
    66import java.io.ObjectOutputStream;
    77
    8 import omq.common.event.Event;
    9 import omq.common.event.EventWrapper;
    108import omq.common.message.Request;
    119import omq.common.message.Response;
     
    5149        }
    5250
    53         @Override
    54         public Event deserializeEvent(byte[] bytes) throws SerializerException {
    55                 EventWrapper wrapper = (EventWrapper) deserializeObject(bytes);
    56                 return wrapper.getEvent();
    57         }
    58 
    5951        public Object deserializeObject(byte[] bytes) throws SerializerException {
    6052                try {
  • trunk/src/main/java/omq/common/util/Serializers/KryoImp.java

    r44 r72  
    33import java.io.ByteArrayOutputStream;
    44
    5 import com.esotericsoftware.kryo.Kryo;
    6 import com.esotericsoftware.kryo.io.Input;
    7 import com.esotericsoftware.kryo.io.Output;
    8 
    9 import omq.common.event.Event;
    10 import omq.common.event.EventWrapper;
    115import omq.common.message.Request;
    126import omq.common.message.Response;
    137import omq.exception.SerializerException;
    148import omq.server.RemoteObject;
     9
     10import com.esotericsoftware.kryo.Kryo;
     11import com.esotericsoftware.kryo.io.Input;
     12import com.esotericsoftware.kryo.io.Output;
    1513
    1614/**
     
    5351        }
    5452
    55         @Override
    56         public Event deserializeEvent(byte[] bytes) throws SerializerException {
    57                 EventWrapper wrapper = (EventWrapper) deserializeObject(bytes, EventWrapper.class);
    58                 return wrapper.getEvent();
    59         }
    60 
    6153        public Object deserializeObject(byte[] bytes, Class<?> type) throws SerializerException {
    6254                try {
  • trunk/src/main/java/omq/server/RemoteObject.java

    r67 r72  
    44import java.lang.reflect.Method;
    55import java.util.ArrayList;
    6 import java.util.Collection;
    76import java.util.HashMap;
    87import java.util.List;
     
    109import java.util.Properties;
    1110
    12 import org.apache.log4j.Logger;
    13 
    1411import omq.Remote;
    1512import omq.common.broker.Broker;
    16 import omq.common.event.Event;
    17 import omq.common.event.EventListener;
    18 import omq.common.event.EventWrapper;
    1913import omq.common.util.ParameterQueue;
    20 import omq.common.util.Serializer;
    2114import omq.exception.SerializerException;
     15
     16import org.apache.log4j.Logger;
    2217
    2318import com.rabbitmq.client.Channel;
     
    4237        private Properties env;
    4338        private transient Broker broker;
    44         private transient Serializer serializer;
    4539        private transient RemoteWrapper remoteWrapper;
    4640        private transient Map<String, List<Class<?>>> params;
     
    6963                multiQueue = UID + System.currentTimeMillis();
    7064                env = broker.getEnvironment();
    71                 serializer = broker.getSerializer();
    7265
    7366                params = new HashMap<String, List<Class<?>>>();
     
    9386                this.broker = broker;
    9487                UID = reference;
    95                 serializer = broker.getSerializer();
    9688                if (channel == null || !channel.isOpen()) {
    9789                        channel = broker.getChannel();
     
    141133        }
    142134
    143         @Override
    144         public void notifyEvent(Event event) throws IOException, SerializerException {
    145                 String corrID = java.util.UUID.randomUUID().toString();
    146                 event.setTopic(UID);
    147                 event.setCorrId(corrID);
    148                 EventWrapper wrapper = new EventWrapper(event);
    149                 channel.exchangeDeclare(UID, "fanout");
    150                 channel.basicPublish(UID, "", null, serializer.serialize(wrapper));
    151                 logger.debug("Sending event-> topic: " + UID + ", corrID: " + corrID);
    152         }
    153 
    154135        public void kill() throws IOException {
    155136                logger.warn("Killing objectmq: " + this.getRef());
     
    265246        }
    266247
    267         @Override
    268         public void addListener(EventListener<?> eventListener) throws Exception {
    269         }
    270 
    271         @Override
    272         public void removeListener(EventListener<?> eventListener) throws Exception {
    273         }
    274 
    275         @Override
    276         public Collection<EventListener<?>> getListeners() throws Exception {
    277                 return null;
    278         }
    279 
    280248}
Note: See TracChangeset for help on using the changeset viewer.