Changeset 53 for trunk/src/main/java/omq/common/broker/Broker.java
- Timestamp:
- 06/20/13 16:57:39 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/main/java/omq/common/broker/Broker.java
r50 r53 35 35 private static final Logger logger = Logger.getLogger(Broker.class.getName()); 36 36 37 private static Connection connection; 38 private static Channel channel; 39 private static boolean clientStarted = false; 40 private static boolean connectionClosed = false; 41 private static Properties environment = null; 42 // TODO ask Pedro if it can be only one object in the map (an object can 43 // have multiple threads in the same broker -see environment-) 44 private static Map<String, RemoteObject> remoteObjs; 45 46 /** 47 * Initializes a new Broker with the environment called by reference 48 * 49 * @param env 50 * @throws Exception 51 */ 52 public static synchronized void initBroker(Properties env) throws Exception { 53 if (environment == null) { 54 55 // Load log4j configuration 56 URL log4jResource = Broker.class.getResource("/log4j.xml"); 57 DOMConfigurator.configure(log4jResource); 58 59 remoteObjs = new HashMap<String, RemoteObject>(); 60 environment = env; 61 connection = OmqConnectionFactory.getNewConnection(env); 62 channel = connection.createChannel(); 63 addFaultTolerance(); 64 try { 65 tryConnection(env); 66 } catch (Exception e) { 67 channel.close(); 68 connection.close(); 69 throw new InitBrokerException("The connection didn't work"); 70 } 71 } else { 72 logger.error("Broker is already started"); 73 throw new InitBrokerException("Broker is already started"); 74 } 75 } 76 77 public static void stopBroker() throws Exception { 37 private Connection connection; 38 private Channel channel; 39 private ResponseListener responseListener; 40 private EventDispatcher eventDispatcher; 41 private Serializer serializer; 42 private boolean clientStarted = false; 43 private boolean connectionClosed = false; 44 private Properties environment = null; 45 private Map<String, RemoteObject> remoteObjs; 46 47 public Broker(Properties env) throws Exception { 48 // Load log4j configuration 49 URL log4jResource = Broker.class.getResource("/log4j.xml"); 50 DOMConfigurator.configure(log4jResource); 51 52 remoteObjs = new HashMap<String, RemoteObject>(); 53 serializer = new Serializer(env); 54 environment = env; 55 connection = OmqConnectionFactory.getNewConnection(env); 56 channel = connection.createChannel(); 57 addFaultTolerance(); 58 try { 59 tryConnection(env); 60 } catch (Exception e) { 61 channel.close(); 62 connection.close(); 63 throw new InitBrokerException("The connection didn't work"); 64 } 65 } 66 67 public void stopBroker() throws Exception { 78 68 logger.warn("Stopping broker"); 79 69 // Stop the client 80 70 if (clientStarted) { 81 ResponseListener.stopResponseListner();82 EventDispatcher.stopEventDispatcher();71 responseListener.kill(); 72 eventDispatcher.kill(); 83 73 Proxymq.stopProxy(); 84 74 } … … 95 85 environment = null; 96 86 remoteObjs = null; 97 Serializer.removeSerializers();87 // Serializer.removeSerializers(); 98 88 } 99 89 … … 102 92 * @throws Exception 103 93 */ 104 public staticConnection getConnection() throws Exception {94 public Connection getConnection() throws Exception { 105 95 return connection; 106 96 } 107 97 108 public staticvoid closeConnection() throws IOException {98 public void closeConnection() throws IOException { 109 99 logger.warn("Clossing connection"); 110 100 connectionClosed = true; … … 118 108 * @throws Exception 119 109 */ 120 public staticChannel getChannel() throws Exception {110 public Channel getChannel() throws Exception { 121 111 return channel; 122 112 } … … 128 118 * @throws IOException 129 119 */ 130 public staticChannel getNewChannel() throws IOException {120 public Channel getNewChannel() throws IOException { 131 121 return connection.createChannel(); 132 122 } 133 123 134 124 @SuppressWarnings("unchecked") 135 public static<T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException {125 public <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException { 136 126 try { 137 127 … … 142 132 143 133 if (!Proxymq.containsProxy(reference)) { 144 Proxymq proxy = new Proxymq(reference, contract, environment);134 Proxymq proxy = new Proxymq(reference, contract, this); 145 135 Class<?>[] array = { contract }; 146 136 return (T) Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy); … … 153 143 } 154 144 155 public staticvoid bind(String reference, RemoteObject remote) throws RemoteException {145 public void bind(String reference, RemoteObject remote) throws RemoteException { 156 146 try { 157 remote.startRemoteObject(reference, environment);147 remote.startRemoteObject(reference, this); 158 148 remoteObjs.put(reference, remote); 159 149 } catch (Exception e) { … … 162 152 } 163 153 164 public staticvoid unbind(String reference) throws RemoteException, IOException {154 public void unbind(String reference) throws RemoteException, IOException { 165 155 if (remoteObjs.containsKey(reference)) { 166 156 RemoteObject remote = remoteObjs.get(reference); … … 183 173 * @throws Exception 184 174 */ 185 private s tatic synchronized void initClient(Properties environment) throws Exception {186 if ( ResponseListener.isVoid()) {187 ResponseListener.init(environment);188 } 189 if ( EventDispatcher.isVoid()) {190 EventDispatcher.init(environment);175 private synchronized void initClient(Properties environment) throws Exception { 176 if (responseListener == null) { 177 responseListener = new ResponseListener(this); 178 } 179 if (eventDispatcher == null) { 180 eventDispatcher = new EventDispatcher(this); 191 181 } 192 182 } … … 199 189 * @throws SerializerException 200 190 */ 201 public staticvoid trigger(Event event) throws IOException, SerializerException {191 public void trigger(Event event) throws IOException, SerializerException { 202 192 String UID = event.getTopic(); 203 193 EventWrapper wrapper = new EventWrapper(event); … … 205 195 channel.exchangeDeclare(UID, "fanout"); 206 196 207 byte[] bytesResponse = Serializer.serialize(wrapper);197 byte[] bytesResponse = serializer.serialize(wrapper); 208 198 channel.basicPublish(UID, "", null, bytesResponse); 209 199 … … 218 208 * @throws Exception 219 209 */ 220 public staticvoid tryConnection(Properties env) throws Exception {210 public void tryConnection(Properties env) throws Exception { 221 211 Channel channel = connection.createChannel(); 222 212 String message = "ping"; … … 252 242 * have the listener. 253 243 */ 254 private staticvoid addFaultTolerance() {244 private void addFaultTolerance() { 255 245 connection.addShutdownListener(new ShutdownListener() { 256 246 @Override … … 287 277 } 288 278 289 public staticProperties getEnvironment() {279 public Properties getEnvironment() { 290 280 return environment; 291 281 } 292 282 283 public ResponseListener getResponseListener() { 284 return responseListener; 285 } 286 287 public EventDispatcher getEventDispatcher() { 288 return eventDispatcher; 289 } 290 291 public Serializer getSerializer() { 292 return serializer; 293 } 293 294 }
Note: See TracChangeset
for help on using the changeset viewer.