source: branches/supervisor/src/main/java/omq/supervisor/SupervisorThread.java @ 93

Last change on this file since 93 was 93, checked in by stoda, 11 years ago

TODO: unbind object

File size: 1.8 KB
Line 
1package omq.supervisor;
2
3import java.util.Map;
4import java.util.Set;
5
6import com.rabbitmq.client.AMQP.Queue.DeclareOk;
7import com.rabbitmq.client.Channel;
8
9public class SupervisorThread extends Thread {
10
11        private long sleep;
12        private SupervisorImpl supervisor;
13        private Map<String, OmqSettings> objectSettings;
14
15        public SupervisorThread(SupervisorImpl supervisor, long sleep) {
16                this.sleep = sleep;
17                this.supervisor = supervisor;
18                this.objectSettings = supervisor.getObjectSettings();
19        }
20
21        @Override
22        public void run() {
23                while (true) {
24                        try {
25                                Set<String> keys = objectSettings.keySet();
26                                for (String reference : keys) {
27                                        checkObject(reference);
28                                }
29                        } catch (Exception e) {
30                                e.printStackTrace();
31                        }
32
33                        try {
34                                Thread.sleep(sleep);
35                        } catch (InterruptedException e) {
36                                e.printStackTrace();
37                        }
38                }
39        }
40
41        private void checkObject(String reference) throws Exception {
42                OmqSettings settings = objectSettings.get(reference);
43
44                int minObjects = settings.getMinNumberObjects();
45                int maxMessages = settings.getMaxNumQueued();
46                int minMessages = settings.getMinNumQueued();
47
48                // TODO treure merda...
49                Channel channel = supervisor.getBroker().getChannel();
50                DeclareOk dok = channel.queueDeclarePassive(reference);
51
52                int numConsumers = dok.getConsumerCount();
53                int numMessages = dok.getMessageCount();
54
55                System.out.println("Num Consumers: " + numConsumers + ", num Messages: " + numMessages);
56
57                if (maxMessages < numMessages || minObjects < numConsumers) {
58                        System.out.println("SPAWN TIME!!");
59                        supervisor.spawnObject(settings);
60                        // spawn:
61                        // pregunta a tots i qui no té l'objecte li poses
62                } else if (numMessages < minMessages && minObjects > numConsumers) {
63                        supervisor.unbindObject(settings);
64                        // delete:
65                        // pregunta a tots i qui té l'objecte li treus
66                }
67        }
68}
Note: See TracBrowser for help on using the repository browser.