package omq.server; import java.lang.reflect.InvocationTargetException; import java.util.concurrent.BlockingQueue; import omq.common.message.Request; import omq.common.message.Response; import omq.common.util.Serializer; import omq.exception.OmqException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer.Delivery; /** * * @author Sergi Toda * */ public class InvocationThread extends Thread { private RemoteObject obj; private BlockingQueue deliveryQueue; private boolean killed = false; public InvocationThread(RemoteObject obj, BlockingQueue deliveryQueue) { this.obj = obj; this.deliveryQueue = deliveryQueue; } @Override public void run() { while (!killed) { try { // Get the delivery Delivery delivery = deliveryQueue.take(); String serializerType = delivery.getProperties().getType(); // Deserialize the json Request request = Serializer.deserializeRequest(serializerType, delivery.getBody(), obj); // Log.saveLog("Server-Deserialize", delivery.getBody()); String methodName = request.getMethod(); String requestID = request.getId(); System.out.println("Invoke method: " + methodName + " CorrID: " + requestID); // Invoke the method Object result = null; OmqException error = null; try { result = obj.invokeMethod(request.getMethod(), request.getParams()); } catch (InvocationTargetException e) { Throwable throwable = e.getTargetException(); error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage()); } catch (NoSuchMethodException e) { error = new OmqException(e.getClass().getCanonicalName(), e.getMessage()); } // Reply if it's necessary if (!request.isAsync()) { Response resp = new Response(request.getId(), obj.getRef(), result, error); Channel channel = obj.getChannel(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build(); byte[] bytesResponse = Serializer.serialize(serializerType, resp); channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse); // Log.saveLog("Server-Serialize", bytesResponse); } } catch (InterruptedException i) { i.printStackTrace(); killed = true; } catch (Exception e) { System.out.println("Error a l'Invocation Thread \nException: " + e); e.printStackTrace(); } } } public RemoteObject getObj() { return obj; } public void setObj(RemoteObject obj) { this.obj = obj; } public BlockingQueue getDeliveryQueue() { return deliveryQueue; } public void setDeliveryQueue(BlockingQueue deliveryQueue) { this.deliveryQueue = deliveryQueue; } }