Changeset 72


Ignore:
Timestamp:
06/29/13 20:44:27 (11 years ago)
Author:
stoda
Message:

Events deleted instead of them there's a new example of how to use the observer pattern

Location:
trunk/src
Files:
3 deleted
15 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/Remote.java

    r44 r72  
    11package omq;
    22
    3 import java.io.IOException;
    43import java.io.Serializable;
    5 import java.util.Collection;
    6 
    7 import omq.common.event.Event;
    8 import omq.common.event.EventListener;
    9 import omq.exception.SerializerException;
    104
    115/**
     
    2216         */
    2317        public String getRef();
    24 
    25         public void notifyEvent(Event event) throws IOException, SerializerException;
    26 
    27         public void addListener(EventListener<?> eventListener) throws Exception;
    28 
    29         public void removeListener(EventListener<?> eventListener) throws Exception;
    30 
    31         public Collection<EventListener<?>> getListeners() throws Exception;
    3218}
  • trunk/src/main/java/omq/client/proxy/Proxymq.java

    r70 r72  
    11package omq.client.proxy;
    22
    3 import java.io.IOException;
    43import java.lang.reflect.Array;
    54import java.lang.reflect.InvocationHandler;
    65import java.lang.reflect.Method;
    7 import java.util.Collection;
    86import java.util.HashMap;
    97import java.util.Map;
     
    1614import omq.client.listener.ResponseListener;
    1715import omq.common.broker.Broker;
    18 import omq.common.event.Event;
    19 import omq.common.event.EventDispatcher;
    20 import omq.common.event.EventListener;
    2116import omq.common.message.Request;
    2217import omq.common.message.Response;
     
    2520import omq.exception.OmqException;
    2621import omq.exception.RetryException;
    27 import omq.exception.SerializerException;
    2822import omq.exception.TimeoutException;
    2923
     
    5549        private transient Broker broker;
    5650        private transient ResponseListener rListener;
    57         private transient EventDispatcher dispatcher;
    5851        private transient Serializer serializer;
    5952        private transient Properties env;
    6053        private transient Map<String, byte[]> results;
    61         private transient Map<String, EventListener<?>> listeners;
    6254
    6355        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
     
    9284                this.broker = broker;
    9385                rListener = broker.getResponseListener();
    94                 dispatcher = broker.getEventDispatcher();
    9586                serializer = broker.getSerializer();
    9687
     
    10596                serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA);
    10697
    107                 listeners = new HashMap<String, EventListener<?>>();
    108 
    10998                // Create a new hashmap and registry it in rListener
    11099                results = new HashMap<String, byte[]>();
     
    121110                        if (methodName.equals("getRef")) {
    122111                                return getRef();
    123                         } else if (methodName.equals("addListener")) {
    124                                 addListener((EventListener<?>) arguments[0]);
    125                                 return null;
    126                         } else if (methodName.equals("removeListener")) {
    127                                 removeListener((EventListener<?>) arguments[0]);
    128                                 return null;
    129                         } else if (methodName.equals("getListeners")) {
    130                                 return getListeners();
    131112                        }
    132113                }
     
    306287        }
    307288
    308         @Override
    309         public void notifyEvent(Event event) throws IOException, SerializerException {
    310         }
    311 
    312         @Override
    313         public void addListener(EventListener<?> eventListener) throws Exception {
    314                 if (eventListener.getTopic() == null) {
    315                         eventListener.setTopic(uid);
    316                 }
    317                 listeners.put(eventListener.getTopic(), eventListener);
    318                 dispatcher.addListener(eventListener);
    319         }
    320 
    321         @Override
    322         public void removeListener(EventListener<?> eventListener) throws Exception {
    323                 listeners.remove(eventListener.getTopic());
    324                 dispatcher.removeListener(eventListener);
    325         }
    326 
    327         @Override
    328         public Collection<EventListener<?>> getListeners() throws Exception {
    329                 return listeners.values();
    330         }
    331 
    332289}
  • trunk/src/main/java/omq/common/broker/Broker.java

    r70 r72  
    1313import omq.client.proxy.MultiProxymq;
    1414import omq.client.proxy.Proxymq;
    15 import omq.common.event.Event;
    16 import omq.common.event.EventDispatcher;
    17 import omq.common.event.EventWrapper;
    1815import omq.common.util.OmqConnectionFactory;
    1916import omq.common.util.ParameterQueue;
     
    2118import omq.exception.InitBrokerException;
    2219import omq.exception.RemoteException;
    23 import omq.exception.SerializerException;
    2420import omq.server.RemoteObject;
    2521
     
    4137        private Channel channel;
    4238        private ResponseListener responseListener;
    43         private EventDispatcher eventDispatcher;
    4439        private Serializer serializer;
    4540        private boolean clientStarted = false;
     
    7570                if (clientStarted) {
    7671                        responseListener.kill();
    77                         eventDispatcher.kill();
    7872                        // TODO proxies = null; ??
    7973                }
     
    211205                        responseListener.start();
    212206                }
    213                 if (eventDispatcher == null) {
    214                         eventDispatcher = new EventDispatcher(this);
    215                         eventDispatcher.start();
    216                 }
    217         }
    218 
    219         /**
    220          * This method sends an event with its information
    221          *
    222          * @param event
    223          * @throws IOException
    224          * @throws SerializerException
    225          */
    226         public void trigger(Event event) throws IOException, SerializerException {
    227                 String UID = event.getTopic();
    228                 EventWrapper wrapper = new EventWrapper(event);
    229                 logger.debug("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());
    230                 channel.exchangeDeclare(UID, "fanout");
    231 
    232                 byte[] bytesResponse = serializer.serialize(wrapper);
    233                 channel.basicPublish(UID, "", null, bytesResponse);
    234207        }
    235208
     
    318291        }
    319292
    320         public EventDispatcher getEventDispatcher() {
    321                 return eventDispatcher;
    322         }
    323 
    324293        public Serializer getSerializer() {
    325294                return serializer;
  • trunk/src/main/java/omq/common/util/Serializer.java

    r62 r72  
    44import java.util.Properties;
    55
    6 import omq.common.event.Event;
    76import omq.common.message.Request;
    87import omq.common.message.Response;
     
    4847                        try {
    4948                                String className = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA);
    50                                
     49
    5150                                if (className == null || className.isEmpty()) {
    5251                                        throw new ClassNotFoundException("Class name is null or empty.");
    5352                                }
    54                                
     53
    5554                                serializer = getInstance(className);
    5655                        } catch (Exception ex) {
     
    6261        }
    6362
    64         public ISerializer getInstance(String type) throws SerializerException {                               
     63        public ISerializer getInstance(String type) throws SerializerException {
    6564                if (KRYO.equals(type)) {
    6665                        if (kryoSerializer == null) {
     
    7978                        return javaSerializer;
    8079                }
    81                
     80
    8281                throw new SerializerException("Serializer not found.");
    8382        }
     
    148147        }
    149148
    150         public Event deserializeEvent(byte[] bytes) throws SerializerException {
    151                 ISerializer instance = getInstance();
    152 
    153                 Boolean enableCompression = getEnableCompression();
    154                 if (enableCompression) {
    155                         try {
    156                                 byte[] unZippedBytes = Zipper.unzip(bytes);
    157                                 return instance.deserializeEvent(unZippedBytes);
    158                         } catch (IOException e) {
    159                                 throw new SerializerException(e.getMessage(), e);
    160                         }
    161                 } else {
    162                         return instance.deserializeEvent(bytes);
    163                 }
    164         }
    165 
    166         // public static void removeSerializers() {
    167         // logger.warn("Removing serializers");
    168         // serializer = null;
    169         // kryoSerializer = null;
    170         // javaSerializer = null;
    171         // gsonSerializer = null;
    172         // }
    173149}
  • trunk/src/main/java/omq/common/util/Serializers/GsonImp.java

    r50 r72  
    33import java.util.List;
    44
    5 import omq.common.event.Event;
    65import omq.common.message.Request;
    76import omq.common.message.Response;
     
    7574        }
    7675
    77         @Override
    78         public Event deserializeEvent(byte[] bytes) throws SerializerException {
    79                 try {
    80                         String json = new String(bytes);
    81 
    82                         JsonParser parser = new JsonParser();
    83                         JsonObject jsonObj = parser.parse(json).getAsJsonObject();
    84 
    85                         String type = jsonObj.get("type").getAsString();
    86 
    87                         JsonElement jsonElement = jsonObj.get("event");
    88                         Event event;
    89 
    90                         event = (Event) gson.fromJson(jsonElement, Class.forName(type));
    91 
    92                         return event;
    93                 } catch (Exception e) {
    94                         throw new SerializerException("Deserialize event", e.getCause());
    95                 }
    96         }
    97 
    9876}
  • trunk/src/main/java/omq/common/util/Serializers/ISerializer.java

    r44 r72  
    11package omq.common.util.Serializers;
    22
    3 import omq.common.event.Event;
    43import omq.common.message.Request;
    54import omq.common.message.Response;
     
    1817
    1918        public Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException;
    20 
    21         public Event deserializeEvent(byte[] bytes) throws SerializerException;
    2219}
  • trunk/src/main/java/omq/common/util/Serializers/JavaImp.java

    r44 r72  
    66import java.io.ObjectOutputStream;
    77
    8 import omq.common.event.Event;
    9 import omq.common.event.EventWrapper;
    108import omq.common.message.Request;
    119import omq.common.message.Response;
     
    5149        }
    5250
    53         @Override
    54         public Event deserializeEvent(byte[] bytes) throws SerializerException {
    55                 EventWrapper wrapper = (EventWrapper) deserializeObject(bytes);
    56                 return wrapper.getEvent();
    57         }
    58 
    5951        public Object deserializeObject(byte[] bytes) throws SerializerException {
    6052                try {
  • trunk/src/main/java/omq/common/util/Serializers/KryoImp.java

    r44 r72  
    33import java.io.ByteArrayOutputStream;
    44
    5 import com.esotericsoftware.kryo.Kryo;
    6 import com.esotericsoftware.kryo.io.Input;
    7 import com.esotericsoftware.kryo.io.Output;
    8 
    9 import omq.common.event.Event;
    10 import omq.common.event.EventWrapper;
    115import omq.common.message.Request;
    126import omq.common.message.Response;
    137import omq.exception.SerializerException;
    148import omq.server.RemoteObject;
     9
     10import com.esotericsoftware.kryo.Kryo;
     11import com.esotericsoftware.kryo.io.Input;
     12import com.esotericsoftware.kryo.io.Output;
    1513
    1614/**
     
    5351        }
    5452
    55         @Override
    56         public Event deserializeEvent(byte[] bytes) throws SerializerException {
    57                 EventWrapper wrapper = (EventWrapper) deserializeObject(bytes, EventWrapper.class);
    58                 return wrapper.getEvent();
    59         }
    60 
    6153        public Object deserializeObject(byte[] bytes, Class<?> type) throws SerializerException {
    6254                try {
  • trunk/src/main/java/omq/server/RemoteObject.java

    r67 r72  
    44import java.lang.reflect.Method;
    55import java.util.ArrayList;
    6 import java.util.Collection;
    76import java.util.HashMap;
    87import java.util.List;
     
    109import java.util.Properties;
    1110
    12 import org.apache.log4j.Logger;
    13 
    1411import omq.Remote;
    1512import omq.common.broker.Broker;
    16 import omq.common.event.Event;
    17 import omq.common.event.EventListener;
    18 import omq.common.event.EventWrapper;
    1913import omq.common.util.ParameterQueue;
    20 import omq.common.util.Serializer;
    2114import omq.exception.SerializerException;
     15
     16import org.apache.log4j.Logger;
    2217
    2318import com.rabbitmq.client.Channel;
     
    4237        private Properties env;
    4338        private transient Broker broker;
    44         private transient Serializer serializer;
    4539        private transient RemoteWrapper remoteWrapper;
    4640        private transient Map<String, List<Class<?>>> params;
     
    6963                multiQueue = UID + System.currentTimeMillis();
    7064                env = broker.getEnvironment();
    71                 serializer = broker.getSerializer();
    7265
    7366                params = new HashMap<String, List<Class<?>>>();
     
    9386                this.broker = broker;
    9487                UID = reference;
    95                 serializer = broker.getSerializer();
    9688                if (channel == null || !channel.isOpen()) {
    9789                        channel = broker.getChannel();
     
    141133        }
    142134
    143         @Override
    144         public void notifyEvent(Event event) throws IOException, SerializerException {
    145                 String corrID = java.util.UUID.randomUUID().toString();
    146                 event.setTopic(UID);
    147                 event.setCorrId(corrID);
    148                 EventWrapper wrapper = new EventWrapper(event);
    149                 channel.exchangeDeclare(UID, "fanout");
    150                 channel.basicPublish(UID, "", null, serializer.serialize(wrapper));
    151                 logger.debug("Sending event-> topic: " + UID + ", corrID: " + corrID);
    152         }
    153 
    154135        public void kill() throws IOException {
    155136                logger.warn("Killing objectmq: " + this.getRef());
     
    265246        }
    266247
    267         @Override
    268         public void addListener(EventListener<?> eventListener) throws Exception {
    269         }
    270 
    271         @Override
    272         public void removeListener(EventListener<?> eventListener) throws Exception {
    273         }
    274 
    275         @Override
    276         public Collection<EventListener<?>> getListeners() throws Exception {
    277                 return null;
    278         }
    279 
    280248}
  • trunk/src/test/java/omq/test/calculator/Calculator.java

    r47 r72  
    11package omq.test.calculator;
    2 
    3 import java.io.IOException;
    42
    53import omq.Remote;
     
    75import omq.client.annotation.RemoteInterface;
    86import omq.client.annotation.SyncMethod;
    9 import omq.exception.SerializerException;
    107
    118@RemoteInterface
     
    2017        public void sendMessage(Message m);
    2118
    22         @AsyncMethod
    23         public void asyncDivideByZero() throws IOException, SerializerException;
    24 
    2519        @SyncMethod(timeout = 1500)
    2620        public int divideByZero();
  • trunk/src/test/java/omq/test/calculator/CalculatorImpl.java

    r63 r72  
    11package omq.test.calculator;
    22
    3 import java.io.IOException;
    4 
    5 import omq.common.broker.Broker;
    6 import omq.exception.SerializerException;
    73import omq.server.RemoteObject;
    84
    95public class CalculatorImpl extends RemoteObject implements Calculator {
    106        private int mult = 0;
    11         private Broker broker;
    127
    138        public CalculatorImpl() throws Exception {
    149                super();
    15         }
    16 
    17         public CalculatorImpl(Broker broker) throws Exception {
    18                 super();
    19                 this.broker = broker;
    2010        }
    2111
     
    4131
    4232        @Override
    43         public void asyncDivideByZero() throws IOException, SerializerException {
    44                 ZeroEvent ze = new ZeroEvent("my zero event", "zero-event");
    45                 broker.trigger(ze);
    46                 // notifyEvent(ze);
    47         }
    48 
    49         @Override
    5033        public void sendMessage(Message m) {
    5134                System.out.println("Code = " + m.getCode());
  • trunk/src/test/java/omq/test/calculator/CalculatorTest.java

    r62 r72  
    120120
    121121        @Test
    122         public void notifyEvent() throws Exception {
    123                 ZeroListener zL = new ZeroListener("zero-event");
    124 
    125                 remoteCalc.addListener(zL);
    126 
    127                 remoteCalc.asyncDivideByZero();
    128 
    129                 Thread.sleep(200);
    130         }
    131 
    132         @Test
    133122        public void sendMessage() throws Exception {
    134123                Message m = new Message(2334, "Hello objectmq");
  • trunk/src/test/java/omq/test/observer/ObserverTest.java

    r71 r72  
    2323        private static String OBSERVER = "observer";
    2424        private static Broker broker;
     25        private static RemoteSubjectImpl subject;
    2526        private static RemoteSubject remoteSubject;
    2627
     
    7172                env.setProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000");
    7273
     74                // Set info about the queue & the exchange where the ResponseListener
     75                // will listen to.
     76                env.setProperty(ParameterQueue.RPC_REPLY_QUEUE, "server_reply_queue");
     77                env.setProperty(ParameterQueue.EVENT_REPLY_QUEUE, "server_event_queue");
     78
    7379                Broker broker = new Broker(env);
    74                 RemoteSubjectImpl subject = new RemoteSubjectImpl();
     80                subject = new RemoteSubjectImpl(broker);
    7581                broker.bind(SUBJECT, subject);
    7682
     
    9096                RemoteObserverImpl observer = new RemoteObserverImpl();
    9197                broker.bind(OBSERVER, observer);
     98                observer.setSubject(remoteSubject);
    9299
    93                 remoteSubject.addObserver(observer);
     100                remoteSubject.addObserver(observer.getRef());
    94101                remoteSubject.setState(expected);
    95                 remoteSubject.notifyObservers();
     102                // both proxies are in the same thread for this reason since subject has
     103                // a proxy inside we need to use subject or create a new thread
     104                subject.notifyObservers();
    96105                actual = observer.getObsState();
    97106
    98107                assertEquals(expected, actual);
    99                 remoteSubject.setState(null);
     108                remoteSubject.setState("");
    100109        }
    101110
  • trunk/src/test/java/omq/test/observer/RemoteSubject.java

    r71 r72  
    44import omq.client.annotation.RemoteInterface;
    55import omq.client.annotation.SyncMethod;
     6import omq.exception.RemoteException;
    67
    78@RemoteInterface
     
    1516
    1617        @SyncMethod(timeout = 1000)
    17         public void addObserver(RemoteObserver o);
     18        public void addObserver(String ref) throws RemoteException;
    1819
    1920        @SyncMethod(timeout = 1000)
    20         public void removeObserver(RemoteObserver o);
     21        public void removeObserver(String ref) throws RemoteException;
    2122
    2223        @SyncMethod(timeout = 1000)
  • trunk/src/test/java/omq/test/observer/RemoteSubjectImpl.java

    r71 r72  
    44import java.util.List;
    55
     6import omq.common.broker.Broker;
     7import omq.exception.RemoteException;
    68import omq.server.RemoteObject;
    79
     
    1315        private static final long serialVersionUID = 1L;
    1416        private String state;
    15         private List<RemoteObserver> list = new ArrayList<RemoteObserver>();
     17        private Broker broker;
     18        private List<RemoteObserver> list;
    1619
    17         @Override
    18         public void addObserver(RemoteObserver o) {
    19                 list.add(o);
     20        public RemoteSubjectImpl(Broker broker) {
     21                this.broker = broker;
     22                list = new ArrayList<RemoteObserver>();
    2023        }
    2124
    2225        @Override
    23         public void removeObserver(RemoteObserver o) {
    24                 list.remove(o);
     26        public void addObserver(String ref) throws RemoteException {
     27                RemoteObserver obs = broker.lookup(ref, RemoteObserver.class);
     28                list.add(obs);
     29        }
     30
     31        @Override
     32        public void removeObserver(String ref) throws RemoteException {
     33                RemoteObserver obs = broker.lookup(ref, RemoteObserver.class);
     34                list.remove(obs);
    2535        }
    2636
Note: See TracChangeset for help on using the changeset viewer.