source: branches/supervisor/src/main/java/omq/supervisor/SupervisorImpl.java @ 106

Last change on this file since 106 was 106, checked in by stoda, 11 years ago

abans que la segueixi liant...

File size: 4.9 KB
Line 
1package omq.supervisor;
2
3import java.util.ArrayList;
4import java.util.HashMap;
5import java.util.List;
6import java.util.Map;
7import java.util.Set;
8
9import omq.common.broker.HasObject;
10import omq.common.broker.RemoteBroker;
11import omq.common.broker.RemoteMultiBroker;
12import omq.exception.RemoteException;
13import omq.server.RemoteObject;
14
15import org.apache.log4j.Logger;
16
17import com.rabbitmq.client.Channel;
18import com.rabbitmq.client.AMQP.Queue.DeclareOk;
19
20public class SupervisorImpl extends RemoteObject implements Supervisor, Runnable {
21
22        /**
23         *
24         */
25        private static final long serialVersionUID = 1L;
26        private static final Logger logger = Logger.getLogger(SupervisorImpl.class.getName());
27
28        private String brokerSet;
29        private long sleep;
30        private Map<String, OmqSettings> objectSettings;
31        // TODO: Set<?>
32        private RemoteMultiBroker multiBroker;
33        private Map<String, RemoteBroker> brokerMap;
34        private List<RemoteBroker> brokers;
35
36        public SupervisorImpl(String brokerSet, long sleep) {
37                this.brokerSet = brokerSet;
38                this.sleep = sleep;
39                brokers = new ArrayList<RemoteBroker>();
40                objectSettings = new HashMap<String, OmqSettings>();
41        }
42
43        @Override
44        public void run() {
45                try {
46                        multiBroker = getBroker().lookup(brokerSet, RemoteMultiBroker.class);
47                        while (true) {
48                                try {
49                                        Set<String> keys = objectSettings.keySet();
50                                        for (String reference : keys) {
51                                                System.out.println("key = " + keys);
52                                                checkObject(reference);
53                                        }
54                                } catch (Exception e) {
55                                        e.printStackTrace();
56                                }
57
58                                try {
59                                        Thread.sleep(sleep);
60                                } catch (InterruptedException e) {
61                                        e.printStackTrace();
62                                }
63                        }
64                } catch (RemoteException e1) {
65                        // TODO Auto-generated catch block
66                        e1.printStackTrace();
67                }
68        }
69
70        @Override
71        public void subscribe(String brokerSet, String brokerName) throws Exception {
72                if (brokerSet.equals(brokerSet) && !brokerMap.containsKey(brokerName)) {
73                        logger.info("Broker " + brokerName + " subscrived");
74                        RemoteBroker broker = getBroker().lookup(brokerName, RemoteBroker.class);
75                        brokerMap.put(brokerSet, broker);
76                } else {
77                        throw new Exception("blablabla");
78                }
79        }
80
81        @Override
82        public void spawnObject(OmqSettings settings) throws Exception {
83                String reference = settings.getReference();
84
85                if (objectSettings.containsKey(reference)) {
86                        throw new Exception("JAJAJAJAJA");
87                }
88                objectSettings.put(reference, settings);
89        }
90
91        @Override
92        public void spawnObject(OmqSettings settings, HasObject[] hasList, int numObjects) throws Exception {
93
94                String reference = settings.getReference();
95
96                if (!objectSettings.containsKey(reference)) {
97                        objectSettings.put(reference, settings);
98                }
99
100                int minObjects = settings.getMinNumberObjects();
101
102                for (HasObject h : hasList) {
103                        if (h.hasObject() && minObjects >= numObjects) {
104                                brokerMap.get(h.getBrokerName()).spawnObject(reference, settings.getClassName(), settings.getProps());
105                                numObjects++;
106                                if (minObjects >= numObjects) {
107                                        break;
108                                }
109                        }
110                }
111
112        }
113
114        @Override
115        public void unbindObject(OmqSettings settings, HasObject[] hasList, int numObjects) throws Exception {
116                String reference = settings.getReference();
117
118                int minObjects = settings.getMinNumberObjects();
119
120                for (RemoteBroker broker : brokers) {
121                        if (broker.hasObject(reference) && (numObjects - 1) >= minObjects) {
122                                broker.deleteObject(reference);
123                                break;
124                        }
125                }
126
127        }
128
129        private void checkObject(String reference) throws Exception {
130                OmqSettings settings = objectSettings.get(reference);
131
132                int minObjects = settings.getMinNumberObjects();
133                int maxMessages = settings.getMaxNumQueued();
134                int minMessages = settings.getMinNumQueued();
135
136                Channel channel = getBroker().getChannel();
137                DeclareOk dok = channel.queueDeclarePassive(reference);
138
139                int numObjects = 0;
140                int numMessages = dok.getMessageCount();
141
142                HasObject[] hasList = multiBroker.hasObjectInfo(reference);
143                for (HasObject h : hasList) {
144                        if (h.hasObject()) {
145                                numObjects++;
146                        }
147                }
148
149                System.out.println("Num Consumers: " + numObjects + ", num Messages: " + numMessages);
150
151                if (maxMessages < numMessages || numObjects < minObjects) {
152                        logger.info("SPAWN TIME!!");
153                        spawnObject(settings, hasList, numObjects);
154                } else if (numMessages < minMessages && minObjects < numObjects) {
155                        logger.info("Unbinding object!!!");
156                        unbindObject(settings, hasList, numObjects);
157                }
158        }
159
160        public Map<String, OmqSettings> getObjectSettings() {
161                return objectSettings;
162        }
163
164        public void setObjectSettings(Map<String, OmqSettings> objectSettings) {
165                this.objectSettings = objectSettings;
166        }
167
168        public List<RemoteBroker> getBrokers() {
169                return brokers;
170        }
171
172        public void setBrokers(List<RemoteBroker> brokers) {
173                this.brokers = brokers;
174        }
175
176        public String getBrokerSet() {
177                return brokerSet;
178        }
179
180        public void setBrokerSet(String brokerSet) {
181                this.brokerSet = brokerSet;
182        }
183
184        public RemoteMultiBroker getMultiBroker() {
185                return multiBroker;
186        }
187
188        public void setMultiBroker(RemoteMultiBroker multiBroker) {
189                this.multiBroker = multiBroker;
190        }
191
192}
Note: See TracBrowser for help on using the repository browser.