Changeset 16


Ignore:
Timestamp:
05/21/13 15:08:02 (11 years ago)
Author:
stoda
Message:

EventWrapperAdded?

Location:
trunk/objectmq/src/omq
Files:
2 added
12 edited

Legend:

Unmodified
Added
Removed
  • trunk/objectmq/src/omq/client/annotation/AsyncMethod.java

    r9 r16  
    1616@Target(ElementType.METHOD)
    1717public @interface AsyncMethod {
    18         String generateEvent() default "";
    1918}
  • trunk/objectmq/src/omq/common/event/Event.java

    r15 r16  
    1717        private String corrId;
    1818        private String topic;
     19
     20        public Event() {
     21        }
     22
     23        public Event(String corrId, String topic) {
     24                this.corrId = corrId;
     25                this.topic = topic;
     26        }
    1927
    2028        public Event(String corrId) {
  • trunk/objectmq/src/omq/common/event/EventDispatcher.java

    r15 r16  
    110110
    111111        public int addListener(EventListener e) throws Exception {
     112                // Map<String, ArrayList<EventListener<Event>>> mListeners =
     113                // listeners.get(e.getTopic());
     114                // if(mListeners == null){
     115                // mListeners = new HashMap<String, ArrayList<EventListener<Event>>>();
     116                //
     117                // String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
     118                // String reference = e.getTopic();
     119                // channel.exchangeDeclare(reference, "fanout");
     120                // channel.queueBind(queueName, reference, "");
     121                // }
     122
    112123                Vector<EventListener> vListeners = listeners.get(e.getTopic());
    113124                if (vListeners == null) {
  • trunk/objectmq/src/omq/common/event/EventListener.java

    r14 r16  
    88 *
    99 */
    10 public abstract class EventListener {
     10public abstract class EventListener<E extends Event> {
    1111        private String topic;
    1212
     
    3131         * @param event
    3232         */
    33         public abstract void notifyEvent(Event event);
     33        public abstract void notifyEvent(E event);
    3434
    3535        public void setTopic(String topic) {
  • trunk/objectmq/src/omq/common/util/Serializers/GsonImp.java

    r14 r16  
    7373
    7474        @Override
    75         public Event deserializeEvent(byte[] unZippedBytes) throws SerializerException {
    76                 // TODO deserializeEvent class<?> ¿?
    77                 return null;
     75        public Event deserializeEvent(byte[] bytes) throws SerializerException {
     76                try {
     77                        String json = new String(bytes);
     78System.out.println(json);
     79                        JsonParser parser = new JsonParser();
     80                        JsonObject jsonObj = parser.parse(json).getAsJsonObject();
     81
     82                        String type = jsonObj.get("type").getAsString();
     83
     84                        JsonElement jsonElement = jsonObj.get("event");
     85                        Event event;
     86
     87                        event = (Event) gson.fromJson(jsonElement, Class.forName(type));
     88
     89                        return event;
     90                } catch (Exception e) {
     91                        throw new SerializerException("Deserialize event", e.getCause());
     92                }
    7893        }
    79        
    80        
    8194
    8295}
  • trunk/objectmq/src/omq/common/util/Serializers/KryoImp.java

    r14 r16  
    88
    99import omq.common.event.Event;
     10import omq.common.event.EventWrapper;
    1011import omq.common.message.Request;
    1112import omq.common.message.Response;
     
    5455        @Override
    5556        public Event deserializeEvent(byte[] bytes) throws SerializerException {
    56                 return (Event) deserializeObject(bytes, Event.class);
     57                EventWrapper wrapper = (EventWrapper) deserializeObject(bytes, EventWrapper.class);
     58                return wrapper.getEvent();
    5759        }
    5860
  • trunk/objectmq/src/omq/server/remote/request/RemoteObject.java

    r15 r16  
    1414import omq.common.event.Event;
    1515import omq.common.event.EventListener;
     16import omq.common.event.EventWrapper;
    1617import omq.common.util.ParameterQueue;
    1718import omq.common.util.Serializer;
     
    123124        public void notifyEvent(Event event) throws IOException, SerializerException {
    124125                event.setTopic(UID);
     126                EventWrapper wrapper = new EventWrapper(event);
    125127                channel.exchangeDeclare(UID, "fanout");
    126                 channel.basicPublish(UID, "", null, Serializer.serialize(event));
     128                channel.basicPublish(UID, "", null, Serializer.serialize(wrapper));
    127129        }
    128130
  • trunk/objectmq/src/omq/ztest/calculator/Calculator.java

    r15 r16  
    11package omq.ztest.calculator;
     2
     3import java.io.IOException;
    24
    35import omq.Remote;
     
    57import omq.client.annotation.RemoteInterface;
    68import omq.client.annotation.SyncMethod;
     9import omq.exception.SerializerException;
    710
    811@RemoteInterface
     
    1518       
    1619        @AsyncMethod
    17         public void divideByZero();
     20        public void divideByZero() throws IOException, SerializerException;
    1821
    1922}
  • trunk/objectmq/src/omq/ztest/calculator/CalculatorImpl.java

    r15 r16  
    11package omq.ztest.calculator;
    22
     3import java.io.IOException;
     4
     5import omq.exception.SerializerException;
    36import omq.server.remote.request.RemoteObject;
    47
     
    2831        }
    2932
    30         public void divideByZero() {
    31                
     33        public void divideByZero() throws IOException, SerializerException {
     34                ZeroEvent ze = new ZeroEvent("my zero event");
     35                notifyEvent(ze);
    3236        }
    3337
  • trunk/objectmq/src/omq/ztest/calculator/CalculatorTest.java

    r15 r16  
    2424                env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1");
    2525                env.setProperty(ParameterQueue.SERVER_PORT, "5672");
    26                 env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.KryoImp");
     26//              env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.KryoImp");
     27                env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.GsonImp");
    2728                env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false");
    2829
     
    7071                Thread.sleep(200);
    7172        }
     73
     74        @Test
     75        public void notifyEvent() throws Exception {
     76                ZeroListener zL = new ZeroListener();
     77
     78                remoteCalc.addListener(zL);
     79
     80                remoteCalc.divideByZero();
     81
     82                Thread.sleep(200);
     83        }
    7284}
  • trunk/objectmq/src/omq/ztest/calculator/ServerTest.java

    r15 r16  
    1818                env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1");
    1919                env.setProperty(ParameterQueue.SERVER_PORT, "5672");
    20                 env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.KryoImp");
     20                // env.setProperty(ParameterQueue.SERIALIZERNAME,
     21                // "omq.common.util.Serializers.KryoImp");
     22                env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.GsonImp");
    2123                env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false");
    2224
  • trunk/objectmq/src/omq/ztest/calculator/ZeroEvent.java

    r15 r16  
    99        private static final long serialVersionUID = 1L;
    1010
     11        public ZeroEvent() {
     12        }
     13
    1114        public ZeroEvent(String corrId) {
    1215                super(corrId);
    1316        }
    1417
     18        public String getZeroMessage() {
     19                return "divition by 0";
     20        }
     21
    1522}
Note: See TracChangeset for help on using the changeset viewer.