1 | package omq.supervisor; |
---|
2 | |
---|
3 | import java.util.ArrayList; |
---|
4 | import java.util.HashMap; |
---|
5 | import java.util.List; |
---|
6 | import java.util.Map; |
---|
7 | import java.util.Set; |
---|
8 | |
---|
9 | import omq.common.broker.HasObject; |
---|
10 | import omq.common.broker.RemoteBroker; |
---|
11 | import omq.common.broker.RemoteMultiBroker; |
---|
12 | import omq.exception.RemoteException; |
---|
13 | import omq.server.RemoteObject; |
---|
14 | |
---|
15 | import org.apache.log4j.Logger; |
---|
16 | |
---|
17 | import com.rabbitmq.client.Channel; |
---|
18 | import com.rabbitmq.client.AMQP.Queue.DeclareOk; |
---|
19 | |
---|
20 | public class SupervisorImpl extends RemoteObject implements Supervisor, Runnable { |
---|
21 | |
---|
22 | /** |
---|
23 | * |
---|
24 | */ |
---|
25 | private static final long serialVersionUID = 1L; |
---|
26 | private static final Logger logger = Logger.getLogger(SupervisorImpl.class.getName()); |
---|
27 | |
---|
28 | private String brokerSet; |
---|
29 | private long sleep; |
---|
30 | private Map<String, OmqSettings> objectSettings; |
---|
31 | // TODO: Set<?> |
---|
32 | private RemoteMultiBroker multiBroker; |
---|
33 | private Map<String, RemoteBroker> brokerMap; |
---|
34 | private List<RemoteBroker> brokers; |
---|
35 | |
---|
36 | public SupervisorImpl(String brokerSet, long sleep) { |
---|
37 | this.brokerSet = brokerSet; |
---|
38 | this.sleep = sleep; |
---|
39 | brokers = new ArrayList<RemoteBroker>(); |
---|
40 | objectSettings = new HashMap<String, OmqSettings>(); |
---|
41 | } |
---|
42 | |
---|
43 | @Override |
---|
44 | public void run() { |
---|
45 | try { |
---|
46 | multiBroker = getBroker().lookup(brokerSet, RemoteMultiBroker.class); |
---|
47 | while (true) { |
---|
48 | try { |
---|
49 | Set<String> keys = objectSettings.keySet(); |
---|
50 | for (String reference : keys) { |
---|
51 | System.out.println("key = " + keys); |
---|
52 | checkObject(reference); |
---|
53 | } |
---|
54 | } catch (Exception e) { |
---|
55 | e.printStackTrace(); |
---|
56 | } |
---|
57 | |
---|
58 | try { |
---|
59 | Thread.sleep(sleep); |
---|
60 | } catch (InterruptedException e) { |
---|
61 | e.printStackTrace(); |
---|
62 | } |
---|
63 | } |
---|
64 | } catch (RemoteException e1) { |
---|
65 | // TODO Auto-generated catch block |
---|
66 | e1.printStackTrace(); |
---|
67 | } |
---|
68 | } |
---|
69 | |
---|
70 | @Override |
---|
71 | public void subscribe(String brokerSet, String brokerName) throws Exception { |
---|
72 | if (brokerSet.equals(brokerSet) && !brokerMap.containsKey(brokerName)) { |
---|
73 | logger.info("Broker " + brokerName + " subscrived"); |
---|
74 | RemoteBroker broker = getBroker().lookup(brokerName, RemoteBroker.class); |
---|
75 | brokerMap.put(brokerSet, broker); |
---|
76 | } else { |
---|
77 | throw new Exception("blablabla"); |
---|
78 | } |
---|
79 | } |
---|
80 | |
---|
81 | @Override |
---|
82 | public void spawnObject(OmqSettings settings) throws Exception { |
---|
83 | String reference = settings.getReference(); |
---|
84 | |
---|
85 | if (objectSettings.containsKey(reference)) { |
---|
86 | throw new Exception("JAJAJAJAJA"); |
---|
87 | } |
---|
88 | objectSettings.put(reference, settings); |
---|
89 | } |
---|
90 | |
---|
91 | @Override |
---|
92 | public void spawnObject(OmqSettings settings, HasObject[] hasList, int numObjects) throws Exception { |
---|
93 | |
---|
94 | String reference = settings.getReference(); |
---|
95 | |
---|
96 | if (!objectSettings.containsKey(reference)) { |
---|
97 | objectSettings.put(reference, settings); |
---|
98 | } |
---|
99 | |
---|
100 | int minObjects = settings.getMinNumberObjects(); |
---|
101 | |
---|
102 | for (HasObject h : hasList) { |
---|
103 | if (h.hasObject() && minObjects >= numObjects) { |
---|
104 | brokerMap.get(h.getBrokerName()).spawnObject(reference, settings.getClassName(), settings.getProps()); |
---|
105 | numObjects++; |
---|
106 | if (minObjects >= numObjects) { |
---|
107 | break; |
---|
108 | } |
---|
109 | } |
---|
110 | } |
---|
111 | |
---|
112 | } |
---|
113 | |
---|
114 | @Override |
---|
115 | public void unbindObject(OmqSettings settings, HasObject[] hasList, int numObjects) throws Exception { |
---|
116 | String reference = settings.getReference(); |
---|
117 | |
---|
118 | int minObjects = settings.getMinNumberObjects(); |
---|
119 | |
---|
120 | for (RemoteBroker broker : brokers) { |
---|
121 | if (broker.hasObject(reference) && (numObjects - 1) >= minObjects) { |
---|
122 | broker.deleteObject(reference); |
---|
123 | break; |
---|
124 | } |
---|
125 | } |
---|
126 | |
---|
127 | } |
---|
128 | |
---|
129 | private void checkObject(String reference) throws Exception { |
---|
130 | OmqSettings settings = objectSettings.get(reference); |
---|
131 | |
---|
132 | int minObjects = settings.getMinNumberObjects(); |
---|
133 | int maxMessages = settings.getMaxNumQueued(); |
---|
134 | int minMessages = settings.getMinNumQueued(); |
---|
135 | |
---|
136 | Channel channel = getBroker().getChannel(); |
---|
137 | DeclareOk dok = channel.queueDeclarePassive(reference); |
---|
138 | |
---|
139 | int numObjects = 0; |
---|
140 | int numMessages = dok.getMessageCount(); |
---|
141 | |
---|
142 | HasObject[] hasList = multiBroker.hasObjectInfo(reference); |
---|
143 | for (HasObject h : hasList) { |
---|
144 | if (h.hasObject()) { |
---|
145 | numObjects++; |
---|
146 | } |
---|
147 | } |
---|
148 | |
---|
149 | System.out.println("Num Consumers: " + numObjects + ", num Messages: " + numMessages); |
---|
150 | |
---|
151 | if (maxMessages < numMessages || numObjects < minObjects) { |
---|
152 | logger.info("SPAWN TIME!!"); |
---|
153 | spawnObject(settings, hasList, numObjects); |
---|
154 | } else if (numMessages < minMessages && minObjects < numObjects) { |
---|
155 | logger.info("Unbinding object!!!"); |
---|
156 | unbindObject(settings, hasList, numObjects); |
---|
157 | } |
---|
158 | } |
---|
159 | |
---|
160 | public Map<String, OmqSettings> getObjectSettings() { |
---|
161 | return objectSettings; |
---|
162 | } |
---|
163 | |
---|
164 | public void setObjectSettings(Map<String, OmqSettings> objectSettings) { |
---|
165 | this.objectSettings = objectSettings; |
---|
166 | } |
---|
167 | |
---|
168 | public List<RemoteBroker> getBrokers() { |
---|
169 | return brokers; |
---|
170 | } |
---|
171 | |
---|
172 | public void setBrokers(List<RemoteBroker> brokers) { |
---|
173 | this.brokers = brokers; |
---|
174 | } |
---|
175 | |
---|
176 | public String getBrokerSet() { |
---|
177 | return brokerSet; |
---|
178 | } |
---|
179 | |
---|
180 | public void setBrokerSet(String brokerSet) { |
---|
181 | this.brokerSet = brokerSet; |
---|
182 | } |
---|
183 | |
---|
184 | public RemoteMultiBroker getMultiBroker() { |
---|
185 | return multiBroker; |
---|
186 | } |
---|
187 | |
---|
188 | public void setMultiBroker(RemoteMultiBroker multiBroker) { |
---|
189 | this.multiBroker = multiBroker; |
---|
190 | } |
---|
191 | |
---|
192 | } |
---|