- Timestamp:
- 05/21/13 15:08:02 (11 years ago)
- Location:
- trunk/objectmq/src/omq
- Files:
-
- 2 added
- 12 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/objectmq/src/omq/client/annotation/AsyncMethod.java
r9 r16 16 16 @Target(ElementType.METHOD) 17 17 public @interface AsyncMethod { 18 String generateEvent() default "";19 18 } -
trunk/objectmq/src/omq/common/event/Event.java
r15 r16 17 17 private String corrId; 18 18 private String topic; 19 20 public Event() { 21 } 22 23 public Event(String corrId, String topic) { 24 this.corrId = corrId; 25 this.topic = topic; 26 } 19 27 20 28 public Event(String corrId) { -
trunk/objectmq/src/omq/common/event/EventDispatcher.java
r15 r16 110 110 111 111 public int addListener(EventListener e) throws Exception { 112 // Map<String, ArrayList<EventListener<Event>>> mListeners = 113 // listeners.get(e.getTopic()); 114 // if(mListeners == null){ 115 // mListeners = new HashMap<String, ArrayList<EventListener<Event>>>(); 116 // 117 // String queueName = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE); 118 // String reference = e.getTopic(); 119 // channel.exchangeDeclare(reference, "fanout"); 120 // channel.queueBind(queueName, reference, ""); 121 // } 122 112 123 Vector<EventListener> vListeners = listeners.get(e.getTopic()); 113 124 if (vListeners == null) { -
trunk/objectmq/src/omq/common/event/EventListener.java
r14 r16 8 8 * 9 9 */ 10 public abstract class EventListener {10 public abstract class EventListener<E extends Event> { 11 11 private String topic; 12 12 … … 31 31 * @param event 32 32 */ 33 public abstract void notifyEvent(E ventevent);33 public abstract void notifyEvent(E event); 34 34 35 35 public void setTopic(String topic) { -
trunk/objectmq/src/omq/common/util/Serializers/GsonImp.java
r14 r16 73 73 74 74 @Override 75 public Event deserializeEvent(byte[] unZippedBytes) throws SerializerException { 76 // TODO deserializeEvent class<?> ¿? 77 return null; 75 public Event deserializeEvent(byte[] bytes) throws SerializerException { 76 try { 77 String json = new String(bytes); 78 System.out.println(json); 79 JsonParser parser = new JsonParser(); 80 JsonObject jsonObj = parser.parse(json).getAsJsonObject(); 81 82 String type = jsonObj.get("type").getAsString(); 83 84 JsonElement jsonElement = jsonObj.get("event"); 85 Event event; 86 87 event = (Event) gson.fromJson(jsonElement, Class.forName(type)); 88 89 return event; 90 } catch (Exception e) { 91 throw new SerializerException("Deserialize event", e.getCause()); 92 } 78 93 } 79 80 81 94 82 95 } -
trunk/objectmq/src/omq/common/util/Serializers/KryoImp.java
r14 r16 8 8 9 9 import omq.common.event.Event; 10 import omq.common.event.EventWrapper; 10 11 import omq.common.message.Request; 11 12 import omq.common.message.Response; … … 54 55 @Override 55 56 public Event deserializeEvent(byte[] bytes) throws SerializerException { 56 return (Event) deserializeObject(bytes, Event.class); 57 EventWrapper wrapper = (EventWrapper) deserializeObject(bytes, EventWrapper.class); 58 return wrapper.getEvent(); 57 59 } 58 60 -
trunk/objectmq/src/omq/server/remote/request/RemoteObject.java
r15 r16 14 14 import omq.common.event.Event; 15 15 import omq.common.event.EventListener; 16 import omq.common.event.EventWrapper; 16 17 import omq.common.util.ParameterQueue; 17 18 import omq.common.util.Serializer; … … 123 124 public void notifyEvent(Event event) throws IOException, SerializerException { 124 125 event.setTopic(UID); 126 EventWrapper wrapper = new EventWrapper(event); 125 127 channel.exchangeDeclare(UID, "fanout"); 126 channel.basicPublish(UID, "", null, Serializer.serialize( event));128 channel.basicPublish(UID, "", null, Serializer.serialize(wrapper)); 127 129 } 128 130 -
trunk/objectmq/src/omq/ztest/calculator/Calculator.java
r15 r16 1 1 package omq.ztest.calculator; 2 3 import java.io.IOException; 2 4 3 5 import omq.Remote; … … 5 7 import omq.client.annotation.RemoteInterface; 6 8 import omq.client.annotation.SyncMethod; 9 import omq.exception.SerializerException; 7 10 8 11 @RemoteInterface … … 15 18 16 19 @AsyncMethod 17 public void divideByZero() ;20 public void divideByZero() throws IOException, SerializerException; 18 21 19 22 } -
trunk/objectmq/src/omq/ztest/calculator/CalculatorImpl.java
r15 r16 1 1 package omq.ztest.calculator; 2 2 3 import java.io.IOException; 4 5 import omq.exception.SerializerException; 3 6 import omq.server.remote.request.RemoteObject; 4 7 … … 28 31 } 29 32 30 public void divideByZero() { 31 33 public void divideByZero() throws IOException, SerializerException { 34 ZeroEvent ze = new ZeroEvent("my zero event"); 35 notifyEvent(ze); 32 36 } 33 37 -
trunk/objectmq/src/omq/ztest/calculator/CalculatorTest.java
r15 r16 24 24 env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1"); 25 25 env.setProperty(ParameterQueue.SERVER_PORT, "5672"); 26 env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.KryoImp"); 26 // env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.KryoImp"); 27 env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.GsonImp"); 27 28 env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false"); 28 29 … … 70 71 Thread.sleep(200); 71 72 } 73 74 @Test 75 public void notifyEvent() throws Exception { 76 ZeroListener zL = new ZeroListener(); 77 78 remoteCalc.addListener(zL); 79 80 remoteCalc.divideByZero(); 81 82 Thread.sleep(200); 83 } 72 84 } -
trunk/objectmq/src/omq/ztest/calculator/ServerTest.java
r15 r16 18 18 env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1"); 19 19 env.setProperty(ParameterQueue.SERVER_PORT, "5672"); 20 env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.KryoImp"); 20 // env.setProperty(ParameterQueue.SERIALIZERNAME, 21 // "omq.common.util.Serializers.KryoImp"); 22 env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.GsonImp"); 21 23 env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false"); 22 24 -
trunk/objectmq/src/omq/ztest/calculator/ZeroEvent.java
r15 r16 9 9 private static final long serialVersionUID = 1L; 10 10 11 public ZeroEvent() { 12 } 13 11 14 public ZeroEvent(String corrId) { 12 15 super(corrId); 13 16 } 14 17 18 public String getZeroMessage() { 19 return "divition by 0"; 20 } 21 15 22 }
Note: See TracChangeset
for help on using the changeset viewer.