Changeset 53 for trunk/src/main/java/omq/common
- Timestamp:
- 06/20/13 16:57:39 (11 years ago)
- Location:
- trunk/src/main/java/omq/common
- Files:
-
- 1 deleted
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/main/java/omq/common/broker/Broker.java
r50 r53 35 35 private static final Logger logger = Logger.getLogger(Broker.class.getName()); 36 36 37 private static Connection connection; 38 private static Channel channel; 39 private static boolean clientStarted = false; 40 private static boolean connectionClosed = false; 41 private static Properties environment = null; 42 // TODO ask Pedro if it can be only one object in the map (an object can 43 // have multiple threads in the same broker -see environment-) 44 private static Map<String, RemoteObject> remoteObjs; 45 46 /** 47 * Initializes a new Broker with the environment called by reference 48 * 49 * @param env 50 * @throws Exception 51 */ 52 public static synchronized void initBroker(Properties env) throws Exception { 53 if (environment == null) { 54 55 // Load log4j configuration 56 URL log4jResource = Broker.class.getResource("/log4j.xml"); 57 DOMConfigurator.configure(log4jResource); 58 59 remoteObjs = new HashMap<String, RemoteObject>(); 60 environment = env; 61 connection = OmqConnectionFactory.getNewConnection(env); 62 channel = connection.createChannel(); 63 addFaultTolerance(); 64 try { 65 tryConnection(env); 66 } catch (Exception e) { 67 channel.close(); 68 connection.close(); 69 throw new InitBrokerException("The connection didn't work"); 70 } 71 } else { 72 logger.error("Broker is already started"); 73 throw new InitBrokerException("Broker is already started"); 74 } 75 } 76 77 public static void stopBroker() throws Exception { 37 private Connection connection; 38 private Channel channel; 39 private ResponseListener responseListener; 40 private EventDispatcher eventDispatcher; 41 private Serializer serializer; 42 private boolean clientStarted = false; 43 private boolean connectionClosed = false; 44 private Properties environment = null; 45 private Map<String, RemoteObject> remoteObjs; 46 47 public Broker(Properties env) throws Exception { 48 // Load log4j configuration 49 URL log4jResource = Broker.class.getResource("/log4j.xml"); 50 DOMConfigurator.configure(log4jResource); 51 52 remoteObjs = new HashMap<String, RemoteObject>(); 53 serializer = new Serializer(env); 54 environment = env; 55 connection = OmqConnectionFactory.getNewConnection(env); 56 channel = connection.createChannel(); 57 addFaultTolerance(); 58 try { 59 tryConnection(env); 60 } catch (Exception e) { 61 channel.close(); 62 connection.close(); 63 throw new InitBrokerException("The connection didn't work"); 64 } 65 } 66 67 public void stopBroker() throws Exception { 78 68 logger.warn("Stopping broker"); 79 69 // Stop the client 80 70 if (clientStarted) { 81 ResponseListener.stopResponseListner();82 EventDispatcher.stopEventDispatcher();71 responseListener.kill(); 72 eventDispatcher.kill(); 83 73 Proxymq.stopProxy(); 84 74 } … … 95 85 environment = null; 96 86 remoteObjs = null; 97 Serializer.removeSerializers();87 // Serializer.removeSerializers(); 98 88 } 99 89 … … 102 92 * @throws Exception 103 93 */ 104 public staticConnection getConnection() throws Exception {94 public Connection getConnection() throws Exception { 105 95 return connection; 106 96 } 107 97 108 public staticvoid closeConnection() throws IOException {98 public void closeConnection() throws IOException { 109 99 logger.warn("Clossing connection"); 110 100 connectionClosed = true; … … 118 108 * @throws Exception 119 109 */ 120 public staticChannel getChannel() throws Exception {110 public Channel getChannel() throws Exception { 121 111 return channel; 122 112 } … … 128 118 * @throws IOException 129 119 */ 130 public staticChannel getNewChannel() throws IOException {120 public Channel getNewChannel() throws IOException { 131 121 return connection.createChannel(); 132 122 } 133 123 134 124 @SuppressWarnings("unchecked") 135 public static<T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {125 public <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException { 136 126 try { 137 127 … … 142 132 143 133 if (!Proxymq.containsProxy(reference)) { 144 Proxymq proxy = new Proxymq(reference, contract, environment);134 Proxymq proxy = new Proxymq(reference, contract, this); 145 135 Class<?>[] array = { contract }; 146 136 return (T) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy); … … 153 143 } 154 144 155 public staticvoid bind(String reference, RemoteObject remote) throws RemoteException {145 public void bind(String reference, RemoteObject remote) throws RemoteException { 156 146 try { 157 remote.startRemoteObject(reference, environment);147 remote.startRemoteObject(reference, this); 158 148 remoteObjs.put(reference, remote); 159 149 } catch (Exception e) { … … 162 152 } 163 153 164 public staticvoid unbind(String reference) throws RemoteException, IOException {154 public void unbind(String reference) throws RemoteException, IOException { 165 155 if (remoteObjs.containsKey(reference)) { 166 156 RemoteObject remote = remoteObjs.get(reference); … … 183 173 * @throws Exception 184 174 */ 185 private s tatic synchronized void initClient(Properties environment) throws Exception {186 if ( ResponseListener.isVoid()) {187 ResponseListener.init(environment);188 } 189 if ( EventDispatcher.isVoid()) {190 EventDispatcher.init(environment);175 private synchronized void initClient(Properties environment) throws Exception { 176 if (responseListener == null) { 177 responseListener = new ResponseListener(this); 178 } 179 if (eventDispatcher == null) { 180 eventDispatcher = new EventDispatcher(this); 191 181 } 192 182 } … … 199 189 * @throws SerializerException 200 190 */ 201 public staticvoid trigger(Event event) throws IOException, SerializerException {191 public void trigger(Event event) throws IOException, SerializerException { 202 192 String UID = event.getTopic(); 203 193 EventWrapper wrapper = new EventWrapper(event); … … 205 195 channel.exchangeDeclare(UID, "fanout"); 206 196 207 byte[] bytesResponse = Serializer.serialize(wrapper);197 byte[] bytesResponse = serializer.serialize(wrapper); 208 198 channel.basicPublish(UID, "", null, bytesResponse); 209 199 … … 218 208 * @throws Exception 219 209 */ 220 public staticvoid tryConnection(Properties env) throws Exception {210 public void tryConnection(Properties env) throws Exception { 221 211 Channel channel = connection.createChannel(); 222 212 String message = "ping"; … … 252 242 * have the listener. 253 243 */ 254 private staticvoid addFaultTolerance() {244 private void addFaultTolerance() { 255 245 connection.addShutdownListener(new ShutdownListener() { 256 246 @Override … … 287 277 } 288 278 289 public staticProperties getEnvironment() {279 public Properties getEnvironment() { 290 280 return environment; 291 281 } 292 282 283 public ResponseListener getResponseListener() { 284 return responseListener; 285 } 286 287 public EventDispatcher getEventDispatcher() { 288 return eventDispatcher; 289 } 290 291 public Serializer getSerializer() { 292 return serializer; 293 } 293 294 } -
trunk/src/main/java/omq/common/event/EventDispatcher.java
r49 r53 29 29 public class EventDispatcher extends Thread { 30 30 private static final Logger logger = Logger.getLogger(EventDispatcher.class.getName()); 31 private static EventDispatcher dispatcher;32 31 32 private Broker broker; 33 private Serializer serializer; 33 34 private Map<String, Vector<EventListener>> listeners; 34 35 private Channel channel; … … 37 38 private boolean killed = false; 38 39 39 private EventDispatcher(Properties env) throws Exception { 40 this.env = env; 40 public EventDispatcher(Broker broker) throws Exception { 41 this.broker = broker; 42 env = broker.getEnvironment(); 43 serializer = broker.getSerializer(); 41 44 42 45 // Declare the listeners map … … 44 47 45 48 startEventQueue(); 46 47 49 } 48 50 49 51 private void startEventQueue() throws Exception { 50 52 // Get a new connection and a new channel 51 channel = Broker.getNewChannel();53 channel = broker.getNewChannel(); 52 54 53 55 String event_queue = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE); … … 60 62 } 61 63 62 public static void init(Properties env) throws Exception { 63 if (dispatcher == null) { 64 dispatcher = new EventDispatcher(env); 65 dispatcher.start(); 66 } else { 67 throw new Exception("Already initialized"); 68 } 69 } 70 71 public static void stopEventDispatcher() throws Exception { 64 public void kill() throws Exception { 72 65 logger.warn("Stopping EventDispatcher"); 73 dispatcher.setListeners(null); 74 dispatcher.killed = true; 75 dispatcher.interrupt(); 76 dispatcher.channel.close(); 77 dispatcher = null; 78 } 79 80 public static EventDispatcher getDispatcher(Properties env) throws Exception { 81 if (dispatcher == null) { 82 dispatcher = new EventDispatcher(env); 83 dispatcher.start(); 84 } 85 return dispatcher; 86 } 87 88 public static EventDispatcher getDispatcher() throws Exception { 89 if (dispatcher == null) { 90 throw new Exception("EventDispatcher not initialized"); 91 } 92 return dispatcher; 66 setListeners(null); 67 killed = true; 68 interrupt(); 69 channel.close(); 93 70 } 94 71 … … 104 81 105 82 // Get the event 106 event = Serializer.deserializeEvent(delivery.getBody());83 event = serializer.deserializeEvent(delivery.getBody()); 107 84 108 85 logger.info("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId()); … … 210 187 } 211 188 212 public static boolean isVoid() {213 return dispatcher == null;214 }215 216 189 } -
trunk/src/main/java/omq/common/util/Serializer.java
r49 r53 4 4 import java.util.Properties; 5 5 6 import org.apache.log4j.Logger;7 8 import omq.common.broker.Broker;9 6 import omq.common.event.Event; 10 7 import omq.common.message.Request; … … 23 20 */ 24 21 public class Serializer { 25 private static final Logger logger = Logger.getLogger(Serializer.class.getName()); 26 public static String kryo = "kryo"; 27 public static String java = "java"; 28 public static String gson = "gson"; 22 // private static final Logger logger = 23 // Logger.getLogger(Serializer.class.getName()); 24 public static final String kryo = "kryo"; 25 public static final String java = "java"; 26 public static final String gson = "gson"; 29 27 30 28 // Client serializer 31 public staticISerializer serializer;29 public ISerializer serializer; 32 30 33 31 // Server serializers 34 private staticISerializer kryoSerializer;35 private staticISerializer javaSerializer;36 private staticISerializer gsonSerializer;32 private ISerializer kryoSerializer; 33 private ISerializer javaSerializer; 34 private ISerializer gsonSerializer; 37 35 38 private static Boolean getEnableCompression() { 39 Properties env = Broker.getEnvironment(); 36 private Properties env; 37 38 public Serializer(Properties env) { 39 this.env = env; 40 } 41 42 private Boolean getEnableCompression() { 40 43 return Boolean.valueOf(env.getProperty(ParameterQueue.ENABLECOMPRESSION, "false")); 41 44 } 42 45 43 public staticISerializer getInstance() throws SerializerException {46 public ISerializer getInstance() throws SerializerException { 44 47 if (serializer == null) { 45 48 try { 46 Properties env = Broker.getEnvironment();47 49 String className = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java); 48 50 … … 60 62 } 61 63 62 public staticISerializer getInstance(String type) throws SerializerException {64 public ISerializer getInstance(String type) throws SerializerException { 63 65 if (kryo.equals(type)) { 64 66 if (kryoSerializer == null) { … … 79 81 } 80 82 81 public staticbyte[] serialize(String type, Object obj) throws SerializerException {83 public byte[] serialize(String type, Object obj) throws SerializerException { 82 84 ISerializer instance = getInstance(type); 83 85 … … 96 98 97 99 // TODO: remove this function and think about the event serialization 98 public staticbyte[] serialize(Object obj) throws SerializerException {100 public byte[] serialize(Object obj) throws SerializerException { 99 101 ISerializer instance = getInstance(); 100 102 … … 112 114 } 113 115 114 public staticRequest deserializeRequest(String type, byte[] bytes, RemoteObject obj) throws SerializerException {116 public Request deserializeRequest(String type, byte[] bytes, RemoteObject obj) throws SerializerException { 115 117 ISerializer instance = getInstance(type); 116 118 … … 128 130 } 129 131 130 public staticResponse deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException {132 public Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException { 131 133 ISerializer instance = getInstance(); 132 134 … … 144 146 } 145 147 146 public staticEvent deserializeEvent(byte[] bytes) throws SerializerException {148 public Event deserializeEvent(byte[] bytes) throws SerializerException { 147 149 ISerializer instance = getInstance(); 148 150 … … 160 162 } 161 163 162 public static void removeSerializers() {163 164 165 166 167 168 }164 // public static void removeSerializers() { 165 // logger.warn("Removing serializers"); 166 // serializer = null; 167 // kryoSerializer = null; 168 // javaSerializer = null; 169 // gsonSerializer = null; 170 // } 169 171 }
Note: See TracChangeset
for help on using the changeset viewer.