Ignore:
Timestamp:
09/27/13 17:50:41 (11 years ago)
Author:
stoda
Message:

Semaphores added and removed, ack error discovered and solutioned... Some tests added

Supervisor interface created and more things I'll do later...

TODO: supervisor!!

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/server/InvocationThread.java

    r83 r91  
    2424        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
    2525        private RemoteObject obj;
    26         private transient Serializer serializer;
     26        private Serializer serializer;
     27        // private RemoteWrapper wrapper;
    2728        private BlockingQueue<Delivery> deliveryQueue;
    2829        private boolean killed = false;
    2930
    30         public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue, Serializer serializer) {
     31        public InvocationThread(RemoteObject obj, RemoteWrapper wrapper, Serializer serializer) {
    3132                this.obj = obj;
    32                 this.deliveryQueue = deliveryQueue;
     33                // this.wrapper = wrapper;
     34                this.deliveryQueue = wrapper.getDeliveryQueue();
    3335                this.serializer = serializer;
    3436        }
     
    4042                                // Get the delivery
    4143                                Delivery delivery = deliveryQueue.take();
     44
     45                                // // Indicate this thread is not available
     46                                // wrapper.increaseBusy();
    4247
    4348                                String serializerType = delivery.getProperties().getType();
     
    6469                                }
    6570
     71                               
     72                                Channel channel = obj.getChannel();
     73                               
     74                               
    6675                                // Reply if it's necessary
    6776                                if (!request.isAsync()) {
    6877                                        Response resp = new Response(request.getId(), obj.getRef(), result, error);
    6978
    70                                         Channel channel = obj.getChannel();
     79                                       
    7180
    7281                                        BasicProperties props = delivery.getProperties();
     
    7988                                                        + props.getReplyTo());
    8089                                }
    81 
     90                               
     91                                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
     92                               
     93                                // // Indicate this thread is available
     94                                // wrapper.decreaseBusy();
    8295                        } catch (InterruptedException i) {
    8396                                logger.error(i);
Note: See TracChangeset for help on using the changeset viewer.