Changeset 72
- Timestamp:
- 06/29/13 20:44:27 (11 years ago)
- Location:
- trunk/src
- Files:
-
- 3 deleted
- 15 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/main/java/omq/Remote.java
r44 r72 1 1 package omq; 2 2 3 import java.io.IOException;4 3 import java.io.Serializable; 5 import java.util.Collection;6 7 import omq.common.event.Event;8 import omq.common.event.EventListener;9 import omq.exception.SerializerException;10 4 11 5 /** … … 22 16 */ 23 17 public String getRef(); 24 25 public void notifyEvent(Event event) throws IOException, SerializerException;26 27 public void addListener(EventListener<?> eventListener) throws Exception;28 29 public void removeListener(EventListener<?> eventListener) throws Exception;30 31 public Collection<EventListener<?>> getListeners() throws Exception;32 18 } -
trunk/src/main/java/omq/client/proxy/Proxymq.java
r70 r72 1 1 package omq.client.proxy; 2 2 3 import java.io.IOException;4 3 import java.lang.reflect.Array; 5 4 import java.lang.reflect.InvocationHandler; 6 5 import java.lang.reflect.Method; 7 import java.util.Collection;8 6 import java.util.HashMap; 9 7 import java.util.Map; … … 16 14 import omq.client.listener.ResponseListener; 17 15 import omq.common.broker.Broker; 18 import omq.common.event.Event;19 import omq.common.event.EventDispatcher;20 import omq.common.event.EventListener;21 16 import omq.common.message.Request; 22 17 import omq.common.message.Response; … … 25 20 import omq.exception.OmqException; 26 21 import omq.exception.RetryException; 27 import omq.exception.SerializerException;28 22 import omq.exception.TimeoutException; 29 23 … … 55 49 private transient Broker broker; 56 50 private transient ResponseListener rListener; 57 private transient EventDispatcher dispatcher;58 51 private transient Serializer serializer; 59 52 private transient Properties env; 60 53 private transient Map<String, byte[]> results; 61 private transient Map<String, EventListener<?>> listeners;62 54 63 55 private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>(); … … 92 84 this.broker = broker; 93 85 rListener = broker.getResponseListener(); 94 dispatcher = broker.getEventDispatcher();95 86 serializer = broker.getSerializer(); 96 87 … … 105 96 serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA); 106 97 107 listeners = new HashMap<String, EventListener<?>>();108 109 98 // Create a new hashmap and registry it in rListener 110 99 results = new HashMap<String, byte[]>(); … … 121 110 if (methodName.equals("getRef")) { 122 111 return getRef(); 123 } else if (methodName.equals("addListener")) {124 addListener((EventListener<?>) arguments[0]);125 return null;126 } else if (methodName.equals("removeListener")) {127 removeListener((EventListener<?>) arguments[0]);128 return null;129 } else if (methodName.equals("getListeners")) {130 return getListeners();131 112 } 132 113 } … … 306 287 } 307 288 308 @Override309 public void notifyEvent(Event event) throws IOException, SerializerException {310 }311 312 @Override313 public void addListener(EventListener<?> eventListener) throws Exception {314 if (eventListener.getTopic() == null) {315 eventListener.setTopic(uid);316 }317 listeners.put(eventListener.getTopic(), eventListener);318 dispatcher.addListener(eventListener);319 }320 321 @Override322 public void removeListener(EventListener<?> eventListener) throws Exception {323 listeners.remove(eventListener.getTopic());324 dispatcher.removeListener(eventListener);325 }326 327 @Override328 public Collection<EventListener<?>> getListeners() throws Exception {329 return listeners.values();330 }331 332 289 } -
trunk/src/main/java/omq/common/broker/Broker.java
r70 r72 13 13 import omq.client.proxy.MultiProxymq; 14 14 import omq.client.proxy.Proxymq; 15 import omq.common.event.Event;16 import omq.common.event.EventDispatcher;17 import omq.common.event.EventWrapper;18 15 import omq.common.util.OmqConnectionFactory; 19 16 import omq.common.util.ParameterQueue; … … 21 18 import omq.exception.InitBrokerException; 22 19 import omq.exception.RemoteException; 23 import omq.exception.SerializerException;24 20 import omq.server.RemoteObject; 25 21 … … 41 37 private Channel channel; 42 38 private ResponseListener responseListener; 43 private EventDispatcher eventDispatcher;44 39 private Serializer serializer; 45 40 private boolean clientStarted = false; … … 75 70 if (clientStarted) { 76 71 responseListener.kill(); 77 eventDispatcher.kill();78 72 // TODO proxies = null; ?? 79 73 } … … 211 205 responseListener.start(); 212 206 } 213 if (eventDispatcher == null) {214 eventDispatcher = new EventDispatcher(this);215 eventDispatcher.start();216 }217 }218 219 /**220 * This method sends an event with its information221 *222 * @param event223 * @throws IOException224 * @throws SerializerException225 */226 public void trigger(Event event) throws IOException, SerializerException {227 String UID = event.getTopic();228 EventWrapper wrapper = new EventWrapper(event);229 logger.debug("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());230 channel.exchangeDeclare(UID, "fanout");231 232 byte[] bytesResponse = serializer.serialize(wrapper);233 channel.basicPublish(UID, "", null, bytesResponse);234 207 } 235 208 … … 318 291 } 319 292 320 public EventDispatcher getEventDispatcher() {321 return eventDispatcher;322 }323 324 293 public Serializer getSerializer() { 325 294 return serializer; -
trunk/src/main/java/omq/common/util/Serializer.java
r62 r72 4 4 import java.util.Properties; 5 5 6 import omq.common.event.Event;7 6 import omq.common.message.Request; 8 7 import omq.common.message.Response; … … 48 47 try { 49 48 String className = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA); 50 49 51 50 if (className == null || className.isEmpty()) { 52 51 throw new ClassNotFoundException("Class name is null or empty."); 53 52 } 54 53 55 54 serializer = getInstance(className); 56 55 } catch (Exception ex) { … … 62 61 } 63 62 64 public ISerializer getInstance(String type) throws SerializerException { 63 public ISerializer getInstance(String type) throws SerializerException { 65 64 if (KRYO.equals(type)) { 66 65 if (kryoSerializer == null) { … … 79 78 return javaSerializer; 80 79 } 81 80 82 81 throw new SerializerException("Serializer not found."); 83 82 } … … 148 147 } 149 148 150 public Event deserializeEvent(byte[] bytes) throws SerializerException {151 ISerializer instance = getInstance();152 153 Boolean enableCompression = getEnableCompression();154 if (enableCompression) {155 try {156 byte[] unZippedBytes = Zipper.unzip(bytes);157 return instance.deserializeEvent(unZippedBytes);158 } catch (IOException e) {159 throw new SerializerException(e.getMessage(), e);160 }161 } else {162 return instance.deserializeEvent(bytes);163 }164 }165 166 // public static void removeSerializers() {167 // logger.warn("Removing serializers");168 // serializer = null;169 // kryoSerializer = null;170 // javaSerializer = null;171 // gsonSerializer = null;172 // }173 149 } -
trunk/src/main/java/omq/common/util/Serializers/GsonImp.java
r50 r72 3 3 import java.util.List; 4 4 5 import omq.common.event.Event;6 5 import omq.common.message.Request; 7 6 import omq.common.message.Response; … … 75 74 } 76 75 77 @Override78 public Event deserializeEvent(byte[] bytes) throws SerializerException {79 try {80 String json = new String(bytes);81 82 JsonParser parser = new JsonParser();83 JsonObject jsonObj = parser.parse(json).getAsJsonObject();84 85 String type = jsonObj.get("type").getAsString();86 87 JsonElement jsonElement = jsonObj.get("event");88 Event event;89 90 event = (Event) gson.fromJson(jsonElement, Class.forName(type));91 92 return event;93 } catch (Exception e) {94 throw new SerializerException("Deserialize event", e.getCause());95 }96 }97 98 76 } -
trunk/src/main/java/omq/common/util/Serializers/ISerializer.java
r44 r72 1 1 package omq.common.util.Serializers; 2 2 3 import omq.common.event.Event;4 3 import omq.common.message.Request; 5 4 import omq.common.message.Response; … … 18 17 19 18 public Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException; 20 21 public Event deserializeEvent(byte[] bytes) throws SerializerException;22 19 } -
trunk/src/main/java/omq/common/util/Serializers/JavaImp.java
r44 r72 6 6 import java.io.ObjectOutputStream; 7 7 8 import omq.common.event.Event;9 import omq.common.event.EventWrapper;10 8 import omq.common.message.Request; 11 9 import omq.common.message.Response; … … 51 49 } 52 50 53 @Override54 public Event deserializeEvent(byte[] bytes) throws SerializerException {55 EventWrapper wrapper = (EventWrapper) deserializeObject(bytes);56 return wrapper.getEvent();57 }58 59 51 public Object deserializeObject(byte[] bytes) throws SerializerException { 60 52 try { -
trunk/src/main/java/omq/common/util/Serializers/KryoImp.java
r44 r72 3 3 import java.io.ByteArrayOutputStream; 4 4 5 import com.esotericsoftware.kryo.Kryo;6 import com.esotericsoftware.kryo.io.Input;7 import com.esotericsoftware.kryo.io.Output;8 9 import omq.common.event.Event;10 import omq.common.event.EventWrapper;11 5 import omq.common.message.Request; 12 6 import omq.common.message.Response; 13 7 import omq.exception.SerializerException; 14 8 import omq.server.RemoteObject; 9 10 import com.esotericsoftware.kryo.Kryo; 11 import com.esotericsoftware.kryo.io.Input; 12 import com.esotericsoftware.kryo.io.Output; 15 13 16 14 /** … … 53 51 } 54 52 55 @Override56 public Event deserializeEvent(byte[] bytes) throws SerializerException {57 EventWrapper wrapper = (EventWrapper) deserializeObject(bytes, EventWrapper.class);58 return wrapper.getEvent();59 }60 61 53 public Object deserializeObject(byte[] bytes, Class<?> type) throws SerializerException { 62 54 try { -
trunk/src/main/java/omq/server/RemoteObject.java
r67 r72 4 4 import java.lang.reflect.Method; 5 5 import java.util.ArrayList; 6 import java.util.Collection;7 6 import java.util.HashMap; 8 7 import java.util.List; … … 10 9 import java.util.Properties; 11 10 12 import org.apache.log4j.Logger;13 14 11 import omq.Remote; 15 12 import omq.common.broker.Broker; 16 import omq.common.event.Event;17 import omq.common.event.EventListener;18 import omq.common.event.EventWrapper;19 13 import omq.common.util.ParameterQueue; 20 import omq.common.util.Serializer;21 14 import omq.exception.SerializerException; 15 16 import org.apache.log4j.Logger; 22 17 23 18 import com.rabbitmq.client.Channel; … … 42 37 private Properties env; 43 38 private transient Broker broker; 44 private transient Serializer serializer;45 39 private transient RemoteWrapper remoteWrapper; 46 40 private transient Map<String, List<Class<?>>> params; … … 69 63 multiQueue = UID + System.currentTimeMillis(); 70 64 env = broker.getEnvironment(); 71 serializer = broker.getSerializer();72 65 73 66 params = new HashMap<String, List<Class<?>>>(); … … 93 86 this.broker = broker; 94 87 UID = reference; 95 serializer = broker.getSerializer();96 88 if (channel == null || !channel.isOpen()) { 97 89 channel = broker.getChannel(); … … 141 133 } 142 134 143 @Override144 public void notifyEvent(Event event) throws IOException, SerializerException {145 String corrID = java.util.UUID.randomUUID().toString();146 event.setTopic(UID);147 event.setCorrId(corrID);148 EventWrapper wrapper = new EventWrapper(event);149 channel.exchangeDeclare(UID, "fanout");150 channel.basicPublish(UID, "", null, serializer.serialize(wrapper));151 logger.debug("Sending event-> topic: " + UID + ", corrID: " + corrID);152 }153 154 135 public void kill() throws IOException { 155 136 logger.warn("Killing objectmq: " + this.getRef()); … … 265 246 } 266 247 267 @Override268 public void addListener(EventListener<?> eventListener) throws Exception {269 }270 271 @Override272 public void removeListener(EventListener<?> eventListener) throws Exception {273 }274 275 @Override276 public Collection<EventListener<?>> getListeners() throws Exception {277 return null;278 }279 280 248 } -
trunk/src/test/java/omq/test/calculator/Calculator.java
r47 r72 1 1 package omq.test.calculator; 2 3 import java.io.IOException;4 2 5 3 import omq.Remote; … … 7 5 import omq.client.annotation.RemoteInterface; 8 6 import omq.client.annotation.SyncMethod; 9 import omq.exception.SerializerException;10 7 11 8 @RemoteInterface … … 20 17 public void sendMessage(Message m); 21 18 22 @AsyncMethod23 public void asyncDivideByZero() throws IOException, SerializerException;24 25 19 @SyncMethod(timeout = 1500) 26 20 public int divideByZero(); -
trunk/src/test/java/omq/test/calculator/CalculatorImpl.java
r63 r72 1 1 package omq.test.calculator; 2 2 3 import java.io.IOException;4 5 import omq.common.broker.Broker;6 import omq.exception.SerializerException;7 3 import omq.server.RemoteObject; 8 4 9 5 public class CalculatorImpl extends RemoteObject implements Calculator { 10 6 private int mult = 0; 11 private Broker broker;12 7 13 8 public CalculatorImpl() throws Exception { 14 9 super(); 15 }16 17 public CalculatorImpl(Broker broker) throws Exception {18 super();19 this.broker = broker;20 10 } 21 11 … … 41 31 42 32 @Override 43 public void asyncDivideByZero() throws IOException, SerializerException {44 ZeroEvent ze = new ZeroEvent("my zero event", "zero-event");45 broker.trigger(ze);46 // notifyEvent(ze);47 }48 49 @Override50 33 public void sendMessage(Message m) { 51 34 System.out.println("Code = " + m.getCode()); -
trunk/src/test/java/omq/test/calculator/CalculatorTest.java
r62 r72 120 120 121 121 @Test 122 public void notifyEvent() throws Exception {123 ZeroListener zL = new ZeroListener("zero-event");124 125 remoteCalc.addListener(zL);126 127 remoteCalc.asyncDivideByZero();128 129 Thread.sleep(200);130 }131 132 @Test133 122 public void sendMessage() throws Exception { 134 123 Message m = new Message(2334, "Hello objectmq"); -
trunk/src/test/java/omq/test/observer/ObserverTest.java
r71 r72 23 23 private static String OBSERVER = "observer"; 24 24 private static Broker broker; 25 private static RemoteSubjectImpl subject; 25 26 private static RemoteSubject remoteSubject; 26 27 … … 71 72 env.setProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"); 72 73 74 // Set info about the queue & the exchange where the ResponseListener 75 // will listen to. 76 env.setProperty(ParameterQueue.RPC_REPLY_QUEUE, "server_reply_queue"); 77 env.setProperty(ParameterQueue.EVENT_REPLY_QUEUE, "server_event_queue"); 78 73 79 Broker broker = new Broker(env); 74 RemoteSubjectImpl subject = new RemoteSubjectImpl();80 subject = new RemoteSubjectImpl(broker); 75 81 broker.bind(SUBJECT, subject); 76 82 … … 90 96 RemoteObserverImpl observer = new RemoteObserverImpl(); 91 97 broker.bind(OBSERVER, observer); 98 observer.setSubject(remoteSubject); 92 99 93 remoteSubject.addObserver(observer );100 remoteSubject.addObserver(observer.getRef()); 94 101 remoteSubject.setState(expected); 95 remoteSubject.notifyObservers(); 102 // both proxies are in the same thread for this reason since subject has 103 // a proxy inside we need to use subject or create a new thread 104 subject.notifyObservers(); 96 105 actual = observer.getObsState(); 97 106 98 107 assertEquals(expected, actual); 99 remoteSubject.setState( null);108 remoteSubject.setState(""); 100 109 } 101 110 -
trunk/src/test/java/omq/test/observer/RemoteSubject.java
r71 r72 4 4 import omq.client.annotation.RemoteInterface; 5 5 import omq.client.annotation.SyncMethod; 6 import omq.exception.RemoteException; 6 7 7 8 @RemoteInterface … … 15 16 16 17 @SyncMethod(timeout = 1000) 17 public void addObserver( RemoteObserver o);18 public void addObserver(String ref) throws RemoteException; 18 19 19 20 @SyncMethod(timeout = 1000) 20 public void removeObserver( RemoteObserver o);21 public void removeObserver(String ref) throws RemoteException; 21 22 22 23 @SyncMethod(timeout = 1000) -
trunk/src/test/java/omq/test/observer/RemoteSubjectImpl.java
r71 r72 4 4 import java.util.List; 5 5 6 import omq.common.broker.Broker; 7 import omq.exception.RemoteException; 6 8 import omq.server.RemoteObject; 7 9 … … 13 15 private static final long serialVersionUID = 1L; 14 16 private String state; 15 private List<RemoteObserver> list = new ArrayList<RemoteObserver>(); 17 private Broker broker; 18 private List<RemoteObserver> list; 16 19 17 @Override18 public void addObserver(RemoteObserver o) {19 list .add(o);20 public RemoteSubjectImpl(Broker broker) { 21 this.broker = broker; 22 list = new ArrayList<RemoteObserver>(); 20 23 } 21 24 22 25 @Override 23 public void removeObserver(RemoteObserver o) { 24 list.remove(o); 26 public void addObserver(String ref) throws RemoteException { 27 RemoteObserver obs = broker.lookup(ref, RemoteObserver.class); 28 list.add(obs); 29 } 30 31 @Override 32 public void removeObserver(String ref) throws RemoteException { 33 RemoteObserver obs = broker.lookup(ref, RemoteObserver.class); 34 list.remove(obs); 25 35 } 26 36
Note: See TracChangeset
for help on using the changeset viewer.