Changeset 91


Ignore:
Timestamp:
09/27/13 17:50:41 (11 years ago)
Author:
stoda
Message:

Semaphores added and removed, ack error discovered and solutioned... Some tests added

Supervisor interface created and more things I'll do later...

TODO: supervisor!!

Location:
branches/supervisor
Files:
12 added
1 deleted
4 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/common/broker/Broker.java

    r83 r91  
    4949        private boolean connectionClosed = false;
    5050        private Properties environment = null;
     51        private RemoteBrokerImpl remoteBrokerImpl;
    5152        private Map<String, RemoteObject> remoteObjs;
    5253        private Map<String, Object> proxies = new Hashtable<String, Object>();
     
    357358        }
    358359
     360        public void setSupervisor(String brokerSet, String brokerName) throws Exception {
     361                remoteBrokerImpl = new RemoteBrokerImpl();
     362                remoteBrokerImpl.startRemoteBroker(brokerSet, brokerName, this, getEnvironment());
     363        }
     364
    359365        public Properties getEnvironment() {
    360366                return environment;
     
    368374                return serializer;
    369375        }
     376
     377        public Map<String, RemoteObject> getRemoteObjs() {
     378                return remoteObjs;
     379        }
     380
    370381}
  • branches/supervisor/src/main/java/omq/server/InvocationThread.java

    r83 r91  
    2424        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
    2525        private RemoteObject obj;
    26         private transient Serializer serializer;
     26        private Serializer serializer;
     27        // private RemoteWrapper wrapper;
    2728        private BlockingQueue<Delivery> deliveryQueue;
    2829        private boolean killed = false;
    2930
    30         public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue, Serializer serializer) {
     31        public InvocationThread(RemoteObject obj, RemoteWrapper wrapper, Serializer serializer) {
    3132                this.obj = obj;
    32                 this.deliveryQueue = deliveryQueue;
     33                // this.wrapper = wrapper;
     34                this.deliveryQueue = wrapper.getDeliveryQueue();
    3335                this.serializer = serializer;
    3436        }
     
    4042                                // Get the delivery
    4143                                Delivery delivery = deliveryQueue.take();
     44
     45                                // // Indicate this thread is not available
     46                                // wrapper.increaseBusy();
    4247
    4348                                String serializerType = delivery.getProperties().getType();
     
    6469                                }
    6570
     71                               
     72                                Channel channel = obj.getChannel();
     73                               
     74                               
    6675                                // Reply if it's necessary
    6776                                if (!request.isAsync()) {
    6877                                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
    6978
    70                                         Channel channel = obj.getChannel();
     79                                       
    7180
    7281                                        BasicProperties props = delivery.getProperties();
     
    7988                                                        + props.getReplyTo());
    8089                                }
    81 
     90                               
     91                                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
     92                               
     93                                // // Indicate this thread is available
     94                                // wrapper.decreaseBusy();
    8295                        } catch (InterruptedException i) {
    8396                                logger.error(i);
  • branches/supervisor/src/main/java/omq/server/RemoteObject.java

    r84 r91  
    3838        private static final Logger logger = Logger.getLogger(RemoteObject.class.getName());
    3939
    40         private String UID;
    41         private Properties env;
    42         private transient Broker broker;
    43         private transient String multiQueue;
    44         private transient RemoteWrapper remoteWrapper;
    45         private transient Map<String, List<Class<?>>> params;
    46         private transient Channel channel;
    47         private transient QueueingConsumer consumer;
    48         private transient boolean killed = false;
     40        protected String UID;
     41        protected Properties env;
     42        protected transient Broker broker;
     43        protected transient String multiQueue;
     44        protected transient RemoteWrapper remoteWrapper;
     45        protected transient Map<String, List<Class<?>>> params;
     46        protected transient Channel channel;
     47        protected transient QueueingConsumer consumer;
     48        protected transient boolean killed = false;
    4949
    5050        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
     
    317317                 */
    318318
     319                boolean autoAck = false;
     320
    319321                // Declare a new consumer
    320322                consumer = new QueueingConsumer(channel);
    321                 channel.basicConsume(queue, true, consumer);
    322                 channel.basicConsume(multiQueue, true, consumer);
     323                channel.basicConsume(queue, autoAck, consumer);
     324                channel.basicConsume(multiQueue, autoAck, consumer);
    323325        }
    324326
  • branches/supervisor/src/main/java/omq/server/RemoteWrapper.java

    r83 r91  
    2424        private RemoteObject obj;
    2525        private int numThreads;
     26        // private AtomicInteger busy;
     27        private Object waitLock;
    2628        private ArrayList<InvocationThread> invocationList;
    2729        private BlockingQueue<Delivery> deliveryQueue;
     
    3032                this.obj = obj;
    3133                this.numThreads = numThreads;
     34                // this.busy = new AtomicInteger(0);
     35                this.waitLock = new Object();
    3236                invocationList = new ArrayList<InvocationThread>();
    3337                deliveryQueue = new LinkedBlockingDeque<QueueingConsumer.Delivery>();
     
    3640
    3741                for (int i = 0; i < numThreads; i++) {
    38                         InvocationThread thread = new InvocationThread(obj, deliveryQueue, serializer);
     42                        InvocationThread thread = new InvocationThread(obj, this, serializer);
    3943                        invocationList.add(thread);
    4044                        thread.start();
     
    5155         */
    5256        public void notifyDelivery(Delivery delivery) throws Exception {
     57
     58                // // Ensure there is at least one thread available
     59                // while (this.busy.get() == this.numThreads) {
     60                // System.out.println("Waiting for a thread available");
     61                // logger.debug("Object reference: " + obj.getRef() + " is busy");
     62                //
     63                // synchronized (waitLock) {
     64                // waitLock.wait();
     65                // }
     66                // }
     67                // Notify an available thread
    5368                this.deliveryQueue.put(delivery);
     69
    5470        }
    5571
     
    6379                }
    6480        }
     81
     82        // public int increaseBusy() {
     83        // return this.busy.incrementAndGet();
     84        // }
     85        //
     86        // public int decreaseBusy() {
     87        // int value = this.busy.decrementAndGet();
     88        // synchronized (waitLock) {
     89        // waitLock.notifyAll();
     90        // }
     91        // return value;
     92        // }
    6593
    6694        public RemoteObject getObj() {
     
    95123                this.deliveryQueue = deliveryQueue;
    96124        }
     125
     126        public Object getLock() {
     127                return waitLock;
     128        }
     129
     130        public void setLock(Object lock) {
     131                this.waitLock = lock;
     132        }
    97133}
Note: See TracChangeset for help on using the changeset viewer.