source: PythonClient/src/client/Client.py @ 80

Last change on this file since 80 was 80, checked in by stoda, 11 years ago

first commit

File size: 3.8 KB
Line 
1'''
2Created on 03/07/2013
3
4@author: sergi
5'''
6import pika
7import uuid
8import json
9
10class Client(object):
11    '''
12    classdocs
13    '''
14   
15
16    def __init__(self, env):
17        '''
18        Constructor
19        '''
20        self.rpc_exchange = env.get('exchange', 'rpc_exchange')
21        self.reply_queue = env.get('reply_queue', 'reply_queue') 
22       
23        self.host = env.get('host', 'localhost')
24        self.port = env.get('port', 5672)   
25        self.ssl = env.get('ssl', False)
26       
27        self.credentials = pika.PlainCredentials(env.get('user', 'guest'), env.get('pass', 'guest'))
28       
29        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port, credentials=self.credentials, ssl=self.ssl))
30         
31        self.channel = self.connection.channel()
32       
33        self.callback_queue = self.channel.queue_declare(queue=self.reply_queue)
34       
35        self.channel.basic_consume(self.on_response, no_ack=True, queue=self.reply_queue)
36       
37    def async_call(self, uid, method, params):
38        """ Async call this function will invoke the method 'method' with the params 'params' in the object binded with 'uid'"""
39        self.__async(uid, method, params, False)
40       
41    def multi_async_call(self, uid, method, params):
42        self.__async(uid, method, params, True)
43   
44    def __async(self, uid, method, params, multi):
45        if multi:
46            exch = "multi#" + uid
47        else:
48            exch = self.rpc_exchange
49       
50        corr_id = str(uuid.uuid4())
51        request = json.dumps({"method": method, "params": params, "id": corr_id, "async":"true"})
52       
53        props = pika.BasicProperties(app_id=uid, correlation_id=corr_id, reply_to="", type='gson')
54       
55        self.channel.basic_publish(exchange=exch, routing_key=uid, properties=props, body=request)
56       
57        print "UID: " + uid + ", exchange: " + exch + " ,corrId = " + corr_id + " , json: " + request
58           
59    def sync_call(self, uid, method, params):
60        self.response = None
61        self.corr_id = str(uuid.uuid4())
62        request = json.dumps({"method": method, "params": params, "id": self.corr_id, "async":"false"})
63       
64        props = pika.BasicProperties(app_id=uid, correlation_id=self.corr_id, reply_to="reply_queue", type='gson')
65       
66        self.channel.basic_publish(exchange=self.rpc_exchange, routing_key=uid, properties=props, body=request)
67       
68        print "UID: " + uid + ", exchange: " + self.rpc_exchange + " ,corrId = " + self.corr_id + " , json: " + request
69       
70        return self.__get_response()
71   
72    def multi_sync_call(self, uid, method, params, wait):
73        responses = []
74        self.response = None
75        self.corr_id = str(uuid.uuid4())
76        rpc_exchange = "multi#" + uid
77        request = json.dumps({"method": method, "params": params, "id": self.corr_id, "async":"false"})       
78       
79        props = pika.BasicProperties(app_id=uid, correlation_id=self.corr_id, reply_to="reply_queue", type='gson')
80       
81        self.channel.basic_publish(exchange=rpc_exchange, routing_key=uid, properties=props, body=request)
82       
83        print "UID: " + uid + ", exchange: " + rpc_exchange + " ,corrId = " + self.corr_id + " , json: " + request
84        i = 0
85        while i < wait:
86            responses.append(self.__get_response())
87            self.response = None
88            i = i + 1
89        return responses       
90   
91    def __get_response(self):
92        while  self.response is None:
93            self.connection.process_data_events()
94        return self.response
95   
96    def on_response(self, ch, method, props, body):
97        if self.corr_id == props.correlation_id:
98            result = json.loads(body)
99            self.response = result["result"]
100       
101       
Note: See TracBrowser for help on using the repository browser.