Changeset 53 for trunk/src/main/java
- Timestamp:
- 06/20/13 16:57:39 (11 years ago)
- Location:
- trunk/src/main/java/omq
- Files:
-
- 1 deleted
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/main/java/omq/client/listener/ResponseListener.java
r50 r53 28 28 */ 29 29 public class ResponseListener extends Thread { 30 private static final Logger logger = Logger.getLogger(ResponseListener.class.getName()); 31 private static ResponseListener rListener; 30 private final Logger logger = Logger.getLogger(ResponseListener.class.getName()); 32 31 32 private Broker broker; 33 33 private Channel channel; 34 34 private QueueingConsumer consumer; … … 43 43 * @throws Exception 44 44 */ 45 protected ResponseListener(Properties env) throws Exception { 46 this.env = env; 45 public ResponseListener(Broker broker) throws Exception { 46 this.broker = broker; 47 env = broker.getEnvironment(); 47 48 48 49 // Init the hashtable (it's concurrent) 49 this.results = new Hashtable<String, Map<String, byte[]>>();50 results = new Hashtable<String, Map<String, byte[]>>(); 50 51 51 52 startRPCQueue(); … … 107 108 108 109 private void startRPCQueue() throws Exception { 109 channel = Broker.getNewChannel();110 channel = broker.getNewChannel(); 110 111 111 112 Map<String, Object> args = null; … … 128 129 129 130 /** 130 * Static function which initializes the ResponseListener131 *132 * @param env133 * @throws Exception134 */135 public static void init(Properties env) throws Exception {136 if (rListener == null) {137 rListener = new ResponseListener(env);138 rListener.start();139 } else {140 throw new Exception("Cannot init because it already exists");141 }142 }143 144 /**145 * Method to retrieve the unique ResponseListener, this function can also146 * initialize a ResponseListener using and environment147 *148 * @param env149 * @return unique ResponseListener150 * @throws Exception151 */152 public static ResponseListener getRequestListener(Properties env) throws Exception {153 if (rListener == null) {154 rListener = new ResponseListener(env);155 rListener.start();156 } else {157 // TODO: create a new exception to indicate that a response listener158 // cannot be init159 throw new Exception("Cannot init because it already exists");160 }161 return rListener;162 }163 164 public static boolean isVoid() {165 return rListener == null;166 }167 168 /**169 * Method to retrieve the unique ResponseListener170 *171 * @return172 * @throws Exception173 */174 public static ResponseListener getRequestListener() throws Exception {175 if (rListener == null) {176 throw new Exception("Request listener not initialized");177 }178 return rListener;179 }180 181 /**182 131 * 183 132 * @param key … … 186 135 public boolean containsKey(String key) { 187 136 return results.containsKey(key); 188 }189 190 /**191 * This method is used to kill the unique responseListener in the system192 *193 * @throws Exception194 */195 public static void stopResponseListner() throws Exception {196 rListener.kill();197 rListener = null;198 137 } 199 138 -
trunk/src/main/java/omq/client/proxy/Proxymq.java
r49 r53 51 51 private String uid; 52 52 private transient String serializerType; 53 private transient Broker broker; 53 54 private transient ResponseListener rListener; 54 55 private transient EventDispatcher dispatcher; 56 private transient Serializer serializer; 55 57 // private transient Channel channel; 56 58 private transient Properties env; … … 85 87 * @throws Exception 86 88 */ 87 public Proxymq(String uid, Class<?> clazz, Properties env) throws Exception {89 public Proxymq(String uid, Class<?> clazz, Broker broker) throws Exception { 88 90 this.uid = uid; 89 this.rListener = ResponseListener.getRequestListener(); 90 this.dispatcher = EventDispatcher.getDispatcher(); 91 this.broker = broker; 92 rListener = broker.getResponseListener(); 93 dispatcher = broker.getEventDispatcher(); 94 serializer = broker.getSerializer(); 91 95 92 96 // TODO what is better to use a new channel or to use the same? 93 97 // this.channel = Broker.getChannel(); 94 this.env = env;98 env = broker.getEnvironment(); 95 99 96 100 // set the serializer type … … 160 164 161 165 // Publish the message 162 byte[] bytesRequest = Serializer.serialize(serializerType, request);166 byte[] bytesRequest = serializer.serialize(serializerType, request); 163 167 // TODO See this 164 168 // channel.basicPublish(exchange, routingkey, props, bytesRequest); 165 Broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);169 broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest); 166 170 // Log.saveLog("Client-Serialize", bytesRequest); 167 171 } … … 232 236 throw new TimeoutException("Timeout exception time: " + timeout); 233 237 } 234 resp = Serializer.deserializeResponse(results.get(corrId), type);238 resp = serializer.deserializeResponse(results.get(corrId), type); 235 239 // Log.saveLog("Client-Deserialize", results.get(corrId)); 236 240 -
trunk/src/main/java/omq/common/broker/Broker.java
r50 r53 35 35 private static final Logger logger = Logger.getLogger(Broker.class.getName()); 36 36 37 private static Connection connection; 38 private static Channel channel; 39 private static boolean clientStarted = false; 40 private static boolean connectionClosed = false; 41 private static Properties environment = null; 42 // TODO ask Pedro if it can be only one object in the map (an object can 43 // have multiple threads in the same broker -see environment-) 44 private static Map<String, RemoteObject> remoteObjs; 45 46 /** 47 * Initializes a new Broker with the environment called by reference 48 * 49 * @param env 50 * @throws Exception 51 */ 52 public static synchronized void initBroker(Properties env) throws Exception { 53 if (environment == null) { 54 55 // Load log4j configuration 56 URL log4jResource = Broker.class.getResource("/log4j.xml"); 57 DOMConfigurator.configure(log4jResource); 58 59 remoteObjs = new HashMap<String, RemoteObject>(); 60 environment = env; 61 connection = OmqConnectionFactory.getNewConnection(env); 62 channel = connection.createChannel(); 63 addFaultTolerance(); 64 try { 65 tryConnection(env); 66 } catch (Exception e) { 67 channel.close(); 68 connection.close(); 69 throw new InitBrokerException("The connection didn't work"); 70 } 71 } else { 72 logger.error("Broker is already started"); 73 throw new InitBrokerException("Broker is already started"); 74 } 75 } 76 77 public static void stopBroker() throws Exception { 37 private Connection connection; 38 private Channel channel; 39 private ResponseListener responseListener; 40 private EventDispatcher eventDispatcher; 41 private Serializer serializer; 42 private boolean clientStarted = false; 43 private boolean connectionClosed = false; 44 private Properties environment = null; 45 private Map<String, RemoteObject> remoteObjs; 46 47 public Broker(Properties env) throws Exception { 48 // Load log4j configuration 49 URL log4jResource = Broker.class.getResource("/log4j.xml"); 50 DOMConfigurator.configure(log4jResource); 51 52 remoteObjs = new HashMap<String, RemoteObject>(); 53 serializer = new Serializer(env); 54 environment = env; 55 connection = OmqConnectionFactory.getNewConnection(env); 56 channel = connection.createChannel(); 57 addFaultTolerance(); 58 try { 59 tryConnection(env); 60 } catch (Exception e) { 61 channel.close(); 62 connection.close(); 63 throw new InitBrokerException("The connection didn't work"); 64 } 65 } 66 67 public void stopBroker() throws Exception { 78 68 logger.warn("Stopping broker"); 79 69 // Stop the client 80 70 if (clientStarted) { 81 ResponseListener.stopResponseListner();82 EventDispatcher.stopEventDispatcher();71 responseListener.kill(); 72 eventDispatcher.kill(); 83 73 Proxymq.stopProxy(); 84 74 } … … 95 85 environment = null; 96 86 remoteObjs = null; 97 Serializer.removeSerializers();87 // Serializer.removeSerializers(); 98 88 } 99 89 … … 102 92 * @throws Exception 103 93 */ 104 public staticConnection getConnection() throws Exception {94 public Connection getConnection() throws Exception { 105 95 return connection; 106 96 } 107 97 108 public staticvoid closeConnection() throws IOException {98 public void closeConnection() throws IOException { 109 99 logger.warn("Clossing connection"); 110 100 connectionClosed = true; … … 118 108 * @throws Exception 119 109 */ 120 public staticChannel getChannel() throws Exception {110 public Channel getChannel() throws Exception { 121 111 return channel; 122 112 } … … 128 118 * @throws IOException 129 119 */ 130 public staticChannel getNewChannel() throws IOException {120 public Channel getNewChannel() throws IOException { 131 121 return connection.createChannel(); 132 122 } 133 123 134 124 @SuppressWarnings("unchecked") 135 public static<T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {125 public <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException { 136 126 try { 137 127 … … 142 132 143 133 if (!Proxymq.containsProxy(reference)) { 144 Proxymq proxy = new Proxymq(reference, contract, environment);134 Proxymq proxy = new Proxymq(reference, contract, this); 145 135 Class<?>[] array = { contract }; 146 136 return (T) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy); … … 153 143 } 154 144 155 public staticvoid bind(String reference, RemoteObject remote) throws RemoteException {145 public void bind(String reference, RemoteObject remote) throws RemoteException { 156 146 try { 157 remote.startRemoteObject(reference, environment);147 remote.startRemoteObject(reference, this); 158 148 remoteObjs.put(reference, remote); 159 149 } catch (Exception e) { … … 162 152 } 163 153 164 public staticvoid unbind(String reference) throws RemoteException, IOException {154 public void unbind(String reference) throws RemoteException, IOException { 165 155 if (remoteObjs.containsKey(reference)) { 166 156 RemoteObject remote = remoteObjs.get(reference); … … 183 173 * @throws Exception 184 174 */ 185 private s tatic synchronized void initClient(Properties environment) throws Exception {186 if ( ResponseListener.isVoid()) {187 ResponseListener.init(environment);188 } 189 if ( EventDispatcher.isVoid()) {190 EventDispatcher.init(environment);175 private synchronized void initClient(Properties environment) throws Exception { 176 if (responseListener == null) { 177 responseListener = new ResponseListener(this); 178 } 179 if (eventDispatcher == null) { 180 eventDispatcher = new EventDispatcher(this); 191 181 } 192 182 } … … 199 189 * @throws SerializerException 200 190 */ 201 public staticvoid trigger(Event event) throws IOException, SerializerException {191 public void trigger(Event event) throws IOException, SerializerException { 202 192 String UID = event.getTopic(); 203 193 EventWrapper wrapper = new EventWrapper(event); … … 205 195 channel.exchangeDeclare(UID, "fanout"); 206 196 207 byte[] bytesResponse = Serializer.serialize(wrapper);197 byte[] bytesResponse = serializer.serialize(wrapper); 208 198 channel.basicPublish(UID, "", null, bytesResponse); 209 199 … … 218 208 * @throws Exception 219 209 */ 220 public staticvoid tryConnection(Properties env) throws Exception {210 public void tryConnection(Properties env) throws Exception { 221 211 Channel channel = connection.createChannel(); 222 212 String message = "ping"; … … 252 242 * have the listener. 253 243 */ 254 private staticvoid addFaultTolerance() {244 private void addFaultTolerance() { 255 245 connection.addShutdownListener(new ShutdownListener() { 256 246 @Override … … 287 277 } 288 278 289 public staticProperties getEnvironment() {279 public Properties getEnvironment() { 290 280 return environment; 291 281 } 292 282 283 public ResponseListener getResponseListener() { 284 return responseListener; 285 } 286 287 public EventDispatcher getEventDispatcher() { 288 return eventDispatcher; 289 } 290 291 public Serializer getSerializer() { 292 return serializer; 293 } 293 294 } -
trunk/src/main/java/omq/common/event/EventDispatcher.java
r49 r53 29 29 public class EventDispatcher extends Thread { 30 30 private static final Logger logger = Logger.getLogger(EventDispatcher.class.getName()); 31 private static EventDispatcher dispatcher;32 31 32 private Broker broker; 33 private Serializer serializer; 33 34 private Map<String, Vector<EventListener>> listeners; 34 35 private Channel channel; … … 37 38 private boolean killed = false; 38 39 39 private EventDispatcher(Properties env) throws Exception { 40 this.env = env; 40 public EventDispatcher(Broker broker) throws Exception { 41 this.broker = broker; 42 env = broker.getEnvironment(); 43 serializer = broker.getSerializer(); 41 44 42 45 // Declare the listeners map … … 44 47 45 48 startEventQueue(); 46 47 49 } 48 50 49 51 private void startEventQueue() throws Exception { 50 52 // Get a new connection and a new channel 51 channel = Broker.getNewChannel();53 channel = broker.getNewChannel(); 52 54 53 55 String event_queue = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE); … … 60 62 } 61 63 62 public static void init(Properties env) throws Exception { 63 if (dispatcher == null) { 64 dispatcher = new EventDispatcher(env); 65 dispatcher.start(); 66 } else { 67 throw new Exception("Already initialized"); 68 } 69 } 70 71 public static void stopEventDispatcher() throws Exception { 64 public void kill() throws Exception { 72 65 logger.warn("Stopping EventDispatcher"); 73 dispatcher.setListeners(null); 74 dispatcher.killed = true; 75 dispatcher.interrupt(); 76 dispatcher.channel.close(); 77 dispatcher = null; 78 } 79 80 public static EventDispatcher getDispatcher(Properties env) throws Exception { 81 if (dispatcher == null) { 82 dispatcher = new EventDispatcher(env); 83 dispatcher.start(); 84 } 85 return dispatcher; 86 } 87 88 public static EventDispatcher getDispatcher() throws Exception { 89 if (dispatcher == null) { 90 throw new Exception("EventDispatcher not initialized"); 91 } 92 return dispatcher; 66 setListeners(null); 67 killed = true; 68 interrupt(); 69 channel.close(); 93 70 } 94 71 … … 104 81 105 82 // Get the event 106 event = Serializer.deserializeEvent(delivery.getBody());83 event = serializer.deserializeEvent(delivery.getBody()); 107 84 108 85 logger.info("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId()); … … 210 187 } 211 188 212 public static boolean isVoid() {213 return dispatcher == null;214 }215 216 189 } -
trunk/src/main/java/omq/common/util/Serializer.java
r49 r53 4 4 import java.util.Properties; 5 5 6 import org.apache.log4j.Logger;7 8 import omq.common.broker.Broker;9 6 import omq.common.event.Event; 10 7 import omq.common.message.Request; … … 23 20 */ 24 21 public class Serializer { 25 private static final Logger logger = Logger.getLogger(Serializer.class.getName()); 26 public static String kryo = "kryo"; 27 public static String java = "java"; 28 public static String gson = "gson"; 22 // private static final Logger logger = 23 // Logger.getLogger(Serializer.class.getName()); 24 public static final String kryo = "kryo"; 25 public static final String java = "java"; 26 public static final String gson = "gson"; 29 27 30 28 // Client serializer 31 public staticISerializer serializer;29 public ISerializer serializer; 32 30 33 31 // Server serializers 34 private staticISerializer kryoSerializer;35 private staticISerializer javaSerializer;36 private staticISerializer gsonSerializer;32 private ISerializer kryoSerializer; 33 private ISerializer javaSerializer; 34 private ISerializer gsonSerializer; 37 35 38 private static Boolean getEnableCompression() { 39 Properties env = Broker.getEnvironment(); 36 private Properties env; 37 38 public Serializer(Properties env) { 39 this.env = env; 40 } 41 42 private Boolean getEnableCompression() { 40 43 return Boolean.valueOf(env.getProperty(ParameterQueue.ENABLECOMPRESSION, "false")); 41 44 } 42 45 43 public staticISerializer getInstance() throws SerializerException {46 public ISerializer getInstance() throws SerializerException { 44 47 if (serializer == null) { 45 48 try { 46 Properties env = Broker.getEnvironment();47 49 String className = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java); 48 50 … … 60 62 } 61 63 62 public staticISerializer getInstance(String type) throws SerializerException {64 public ISerializer getInstance(String type) throws SerializerException { 63 65 if (kryo.equals(type)) { 64 66 if (kryoSerializer == null) { … … 79 81 } 80 82 81 public staticbyte[] serialize(String type, Object obj) throws SerializerException {83 public byte[] serialize(String type, Object obj) throws SerializerException { 82 84 ISerializer instance = getInstance(type); 83 85 … … 96 98 97 99 // TODO: remove this function and think about the event serialization 98 public staticbyte[] serialize(Object obj) throws SerializerException {100 public byte[] serialize(Object obj) throws SerializerException { 99 101 ISerializer instance = getInstance(); 100 102 … … 112 114 } 113 115 114 public staticRequest deserializeRequest(String type, byte[] bytes, RemoteObject obj) throws SerializerException {116 public Request deserializeRequest(String type, byte[] bytes, RemoteObject obj) throws SerializerException { 115 117 ISerializer instance = getInstance(type); 116 118 … … 128 130 } 129 131 130 public staticResponse deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException {132 public Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException { 131 133 ISerializer instance = getInstance(); 132 134 … … 144 146 } 145 147 146 public staticEvent deserializeEvent(byte[] bytes) throws SerializerException {148 public Event deserializeEvent(byte[] bytes) throws SerializerException { 147 149 ISerializer instance = getInstance(); 148 150 … … 160 162 } 161 163 162 public static void removeSerializers() {163 164 165 166 167 168 }164 // public static void removeSerializers() { 165 // logger.warn("Removing serializers"); 166 // serializer = null; 167 // kryoSerializer = null; 168 // javaSerializer = null; 169 // gsonSerializer = null; 170 // } 169 171 } -
trunk/src/main/java/omq/server/InvocationThread.java
r49 r53 23 23 private static final Logger logger = Logger.getLogger(InvocationThread.class.getName()); 24 24 private RemoteObject obj; 25 private transient Serializer serializer; 25 26 private BlockingQueue<Delivery> deliveryQueue; 26 27 private boolean killed = false; 27 28 28 public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue ) {29 public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue, Serializer serializer) { 29 30 this.obj = obj; 30 31 this.deliveryQueue = deliveryQueue; 32 this.serializer = serializer; 31 33 } 32 34 … … 41 43 42 44 // Deserialize the json 43 Request request = Serializer.deserializeRequest(serializerType, delivery.getBody(), obj);45 Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj); 44 46 // Log.saveLog("Server-Deserialize", delivery.getBody()); 45 47 … … 73 75 BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build(); 74 76 75 byte[] bytesResponse = Serializer.serialize(serializerType, resp);77 byte[] bytesResponse = serializer.serialize(serializerType, resp); 76 78 channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse); 77 79 -
trunk/src/main/java/omq/server/RemoteObject.java
r49 r53 39 39 private String UID; 40 40 private Properties env; 41 private transient Broker broker; 42 private transient Serializer serializer; 41 43 private transient RemoteWrapper remoteWrapper; 42 44 private transient Map<String, List<Class<?>>> params; … … 60 62 } 61 63 62 public void startRemoteObject(String reference, Properties env) throws Exception { 63 this.UID = reference; 64 this.env = env; 64 public void startRemoteObject(String reference, Broker broker) throws Exception { 65 this.broker = broker; 66 UID = reference; 67 env = broker.getEnvironment(); 68 serializer = broker.getSerializer(); 65 69 66 70 params = new HashMap<String, List<Class<?>>>(); … … 75 79 // Get num threads to use 76 80 int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1")); 77 remoteWrapper = new RemoteWrapper(this, numThreads );81 remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer()); 78 82 79 83 startQueues(); … … 130 134 EventWrapper wrapper = new EventWrapper(event); 131 135 channel.exchangeDeclare(UID, "fanout"); 132 channel.basicPublish(UID, "", null, Serializer.serialize(wrapper));136 channel.basicPublish(UID, "", null, serializer.serialize(wrapper)); 133 137 } 134 138 … … 221 225 222 226 // Start channel 223 channel = Broker.getNewChannel();227 channel = broker.getNewChannel(); 224 228 225 229 // Declares and bindings -
trunk/src/main/java/omq/server/RemoteWrapper.java
r49 r53 4 4 import java.util.concurrent.BlockingQueue; 5 5 import java.util.concurrent.LinkedBlockingDeque; 6 7 import omq.common.util.Serializer; 6 8 7 9 import org.apache.log4j.Logger; … … 23 25 private BlockingQueue<Delivery> deliveryQueue; 24 26 25 public RemoteWrapper(RemoteObject obj, int numThreads ) {27 public RemoteWrapper(RemoteObject obj, int numThreads, Serializer serializer) { 26 28 this.obj = obj; 27 29 this.numThreads = numThreads; … … 32 34 33 35 for (int i = 0; i < numThreads; i++) { 34 InvocationThread thread = new InvocationThread(obj, deliveryQueue );36 InvocationThread thread = new InvocationThread(obj, deliveryQueue, serializer); 35 37 invocationList.add(thread); 36 38 thread.start();
Note: See TracChangeset
for help on using the changeset viewer.