package omq.server.remote.request; import java.util.concurrent.BlockingQueue; import omq.common.message.Request; import omq.common.message.Response; import omq.common.util.Serializer; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer.Delivery; /** * * @author Sergi Toda * */ public class InvocationThread extends Thread { private RemoteObject obj; private BlockingQueue deliveryQueue; private boolean killed = false; public InvocationThread(RemoteObject obj, BlockingQueue deliveryQueue) { this.obj = obj; this.deliveryQueue = deliveryQueue; } @Override public void run() { while (!killed) { try { // Get the delivery Delivery delivery = deliveryQueue.take(); // Deserialize the json Request request = Serializer.deserializeRequest(delivery.getBody(), obj); //Log.saveLog("Server-Deserialize", delivery.getBody()); String methodName = request.getMethod(); String requestID = request.getId(); System.out.println("Invoke method: " + methodName + " CorrID: " + requestID); // Changed --------------------------------------- Object result = null; if ("commit".equalsIgnoreCase(methodName)) { Object[] arguments = request.getArguments(); arguments[1] = ((String) arguments[1]) + "@@" + requestID; result = obj.invokeMethod(methodName, arguments); } else { result = obj.invokeMethod(request.getMethod(), request.getArguments()); } // ----------------------------------------------- // // Invoke the method // Object result = obj.invokeMethod(request.getMethod(), // request.getArguments()); if (!request.isAsync()) { Response resp = new Response(request.getId(), obj.getRef(), result); Channel channel = obj.getChannel(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build(); byte[] bytesResponse = Serializer.serialize(resp); channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse); //Log.saveLog("Server-Serialize", bytesResponse); } } catch (InterruptedException i) { i.printStackTrace(); killed = true; } catch (Exception e) { System.out.println("Error a l'Invocation Thread \nException: " + e); e.printStackTrace(); } } } public RemoteObject getObj() { return obj; } public void setObj(RemoteObject obj) { this.obj = obj; } public BlockingQueue getDeliveryQueue() { return deliveryQueue; } public void setDeliveryQueue(BlockingQueue deliveryQueue) { this.deliveryQueue = deliveryQueue; } }