Changeset 100


Ignore:
Timestamp:
10/09/13 17:35:41 (11 years ago)
Author:
stoda
Message:

Idea of thread supervisor done

TODO test the idea, change the remote properties, etc

Location:
branches/supervisor
Files:
1 added
5 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/.project

    r84 r100  
    11<?xml version="1.0" encoding="UTF-8"?>
    22<projectDescription>
    3         <name>objectmq</name>
     3        <name>objectmq_supervisor</name>
    44        <comment></comment>
    55        <projects>
  • branches/supervisor/src/main/java/omq/common/broker/HasObject.java

    r91 r100  
    1212        private String reference;
    1313        private boolean hasObject;
     14        private int numThreads;
    1415
    15         public HasObject(String brokerName, String reference, boolean hasObject) {
     16        public HasObject(String brokerName, String reference, boolean hasObject, int numThreads) {
    1617                this.brokerName = brokerName;
    1718                this.reference = reference;
    1819                this.hasObject = hasObject;
     20                this.numThreads = numThreads;
    1921        }
    2022
     
    4345        }
    4446
     47        public int getNumThreads() {
     48                return numThreads;
     49        }
     50
     51        public void setNumThreads(int numThreads) {
     52                this.numThreads = numThreads;
     53        }
     54
    4555}
  • branches/supervisor/src/main/java/omq/server/InvocationThread.java

    r96 r100  
    3737        private String UID;
    3838        private Properties env;
     39        private boolean idle;
     40        private long lastExec;
     41
     42        private RemoteThreadPool pool; // TODO posar això bé
    3943
    4044        // Broker
     
    5458                this.broker = broker;
    5559                this.serializer = broker.getSerializer();
     60                this.lastExec = 0;
     61                this.idle = true;
    5662        }
    5763
     
    7379                                // Get the delivery
    7480                                Delivery delivery = consumer.nextDelivery();
     81
     82                                // This thread gets busy
     83                                pool.getBusy().incrementAndGet();
     84                                idle = false;
    7585
    7686                                String serializerType = delivery.getProperties().getType();
     
    112122
    113123                                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
     124
     125                                // The thread is now idle
     126                                lastExec = System.currentTimeMillis();
     127                                idle = true;
     128                                pool.getBusy().decrementAndGet();
     129
    114130                        } catch (InterruptedException i) {
    115131                                logger.error(i);
     
    231247        }
    232248
     249        public long getLastExecution() {
     250                return lastExec;
     251        }
     252
     253        public boolean isIdle() {
     254                return idle;
     255        }
     256
    233257}
  • branches/supervisor/src/main/java/omq/server/RemoteObject.java

    r96 r100  
    3333        private Properties env;
    3434        private transient Broker broker;
     35        private transient RemoteThreadPool pool;
    3536        private transient Map<String, List<Class<?>>> params;
    3637        private transient List<InvocationThread> invocationList;
  • branches/supervisor/src/main/java/omq/supervisor/SupervisorThread.java

    r94 r100  
    44import java.util.Set;
    55
     6import org.apache.log4j.Logger;
     7
    68import com.rabbitmq.client.AMQP.Queue.DeclareOk;
    79import com.rabbitmq.client.Channel;
    810
    911public class SupervisorThread extends Thread {
     12        private static final Logger logger = Logger.getLogger(SupervisorThread.class.getName());
    1013
    1114        private long sleep;
     
    4649                int minMessages = settings.getMinNumQueued();
    4750
    48                 // TODO treure merda...
     51       
    4952                Channel channel = supervisor.getBroker().getChannel();
    5053                DeclareOk dok = channel.queueDeclarePassive(reference);
     
    5558                System.out.println("Num Consumers: " + numConsumers + ", num Messages: " + numMessages);
    5659
    57                 if (maxMessages < numMessages || numConsumers < minObjects ) {
    58                         System.out.println("SPAWN TIME!!");
     60                if (maxMessages < numMessages || numConsumers < minObjects) {
     61                        logger.info("SPAWN TIME!!");
    5962                        supervisor.spawnObject(settings);
    60                         // spawn:
    61                         // pregunta a tots i qui no t�� l'objecte li poses
    6263                } else if (numMessages < minMessages && minObjects < numConsumers) {
    63                         System.out.println("Unbinding object!!!");
     64                        logger.info("Unbinding object!!!");
    6465                        supervisor.unbindObject(settings);
    65                         // delete:
    66                         // pregunta a tots i qui t�� l'objecte li treus
    6766                }
    6867        }
Note: See TracChangeset for help on using the changeset viewer.