Option rpc_response_timeout should not be used in zmq driver
Client option rpc_response_timeout means "Seconds to wait for a response from a call"[1], but it is used by cast in zmq driver. And it's not the right way that underlying zmq driver uses an upper client option. So rpc_response_timeout should not be used in zmq driver, and let's just remove it since rpc client has passed the parameter[2]. ref: [1]: https://github.com/openstack/oslo.messaging/blob/master/oslo_messaging/rpc/client.py#L37 [2]: https://github.com/openstack/oslo.messaging/blob/master/oslo_messaging/rpc/client.py#L150 Change-Id: I86fe4d1f18faf8d4af666301b00e5907dce4417b
This commit is contained in:
parent
03888224a4
commit
46ec05beae
|
@ -202,13 +202,12 @@ class ZmqDriver(base.BaseDriver):
|
||||||
:type retry: int
|
:type retry: int
|
||||||
"""
|
"""
|
||||||
client = self.client.get()
|
client = self.client.get()
|
||||||
timeout = timeout or self.conf.rpc_response_timeout
|
|
||||||
if wait_for_reply:
|
if wait_for_reply:
|
||||||
return client.send_call(target, ctxt, message, timeout, retry)
|
return client.send_call(target, ctxt, message, timeout, retry)
|
||||||
elif target.fanout:
|
elif target.fanout:
|
||||||
client.send_fanout(target, ctxt, message, timeout, retry)
|
client.send_fanout(target, ctxt, message, retry)
|
||||||
else:
|
else:
|
||||||
client.send_cast(target, ctxt, message, timeout, retry)
|
client.send_cast(target, ctxt, message, retry)
|
||||||
|
|
||||||
def send_notification(self, target, ctxt, message, version, retry=None):
|
def send_notification(self, target, ctxt, message, version, retry=None):
|
||||||
"""Send notification to server
|
"""Send notification to server
|
||||||
|
|
|
@ -47,16 +47,16 @@ class ZmqClientBase(object):
|
||||||
allowed_remote_exmods=self.allowed_remote_exmods)) as request:
|
allowed_remote_exmods=self.allowed_remote_exmods)) as request:
|
||||||
return self.call_publisher.send_request(request)
|
return self.call_publisher.send_request(request)
|
||||||
|
|
||||||
def send_cast(self, target, context, message, timeout=None, retry=None):
|
def send_cast(self, target, context, message, retry=None):
|
||||||
with contextlib.closing(zmq_request.CastRequest(
|
with contextlib.closing(zmq_request.CastRequest(
|
||||||
target, context=context, message=message,
|
target, context=context, message=message,
|
||||||
timeout=timeout, retry=retry)) as request:
|
retry=retry)) as request:
|
||||||
self.cast_publisher.send_request(request)
|
self.cast_publisher.send_request(request)
|
||||||
|
|
||||||
def send_fanout(self, target, context, message, timeout=None, retry=None):
|
def send_fanout(self, target, context, message, retry=None):
|
||||||
with contextlib.closing(zmq_request.FanoutRequest(
|
with contextlib.closing(zmq_request.FanoutRequest(
|
||||||
target, context=context, message=message,
|
target, context=context, message=message,
|
||||||
timeout=timeout, retry=retry)) as request:
|
retry=retry)) as request:
|
||||||
self.fanout_publisher.send_request(request)
|
self.fanout_publisher.send_request(request)
|
||||||
|
|
||||||
def send_notify(self, target, context, message, version, retry=None):
|
def send_notify(self, target, context, message, version, retry=None):
|
||||||
|
|
|
@ -91,21 +91,8 @@ class RpcRequest(Request):
|
||||||
LOG.error(_LE("No method specified for RPC call"))
|
LOG.error(_LE("No method specified for RPC call"))
|
||||||
raise KeyError(errmsg)
|
raise KeyError(errmsg)
|
||||||
|
|
||||||
self.timeout = kwargs.pop("timeout")
|
|
||||||
assert self.timeout is not None, "Timeout should be specified!"
|
|
||||||
|
|
||||||
if not isinstance(self.timeout, int) and self.timeout is not None:
|
|
||||||
raise ValueError(
|
|
||||||
"timeout must be an integer, not {0}"
|
|
||||||
.format(type(self.timeout)))
|
|
||||||
|
|
||||||
super(RpcRequest, self).__init__(*args, **kwargs)
|
super(RpcRequest, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
def create_envelope(self):
|
|
||||||
envelope = super(RpcRequest, self).create_envelope()
|
|
||||||
envelope['timeout'] = self.timeout
|
|
||||||
return envelope
|
|
||||||
|
|
||||||
|
|
||||||
class CallRequest(RpcRequest):
|
class CallRequest(RpcRequest):
|
||||||
|
|
||||||
|
@ -113,8 +100,22 @@ class CallRequest(RpcRequest):
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
self.allowed_remote_exmods = kwargs.pop("allowed_remote_exmods")
|
self.allowed_remote_exmods = kwargs.pop("allowed_remote_exmods")
|
||||||
|
|
||||||
|
self.timeout = kwargs.pop("timeout")
|
||||||
|
if self.timeout is None:
|
||||||
|
raise ValueError("Timeout should be specified for a RPC call!")
|
||||||
|
elif not isinstance(self.timeout, int):
|
||||||
|
raise ValueError(
|
||||||
|
"timeout must be an integer, not {0}"
|
||||||
|
.format(type(self.timeout)))
|
||||||
|
|
||||||
super(CallRequest, self).__init__(*args, **kwargs)
|
super(CallRequest, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
def create_envelope(self):
|
||||||
|
envelope = super(CallRequest, self).create_envelope()
|
||||||
|
envelope['timeout'] = self.timeout
|
||||||
|
return envelope
|
||||||
|
|
||||||
|
|
||||||
class CastRequest(RpcRequest):
|
class CastRequest(RpcRequest):
|
||||||
|
|
||||||
|
|
|
@ -75,7 +75,9 @@ class TestZmqBasics(zmq_common.ZmqBaseTestCase):
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
KeyError,
|
KeyError,
|
||||||
self.driver.send,
|
self.driver.send,
|
||||||
target, {}, {'tx_id': 1}, wait_for_reply=True)
|
target, {}, {'tx_id': 1},
|
||||||
|
wait_for_reply=True,
|
||||||
|
timeout=60)
|
||||||
|
|
||||||
def test_send_receive_topic(self):
|
def test_send_receive_topic(self):
|
||||||
"""Call() with topic."""
|
"""Call() with topic."""
|
||||||
|
@ -85,7 +87,8 @@ class TestZmqBasics(zmq_common.ZmqBaseTestCase):
|
||||||
result = self.driver.send(
|
result = self.driver.send(
|
||||||
target, {},
|
target, {},
|
||||||
{'method': 'hello-world', 'tx_id': 1},
|
{'method': 'hello-world', 'tx_id': 1},
|
||||||
wait_for_reply=True)
|
wait_for_reply=True,
|
||||||
|
timeout=60)
|
||||||
self.assertTrue(result)
|
self.assertTrue(result)
|
||||||
|
|
||||||
def test_send_noreply(self):
|
def test_send_noreply(self):
|
||||||
|
@ -130,7 +133,8 @@ class TestZmqBasics(zmq_common.ZmqBaseTestCase):
|
||||||
message = {'method': 'hello-world', 'tx_id': 1}
|
message = {'method': 'hello-world', 'tx_id': 1}
|
||||||
context = {}
|
context = {}
|
||||||
result = self.driver.send(target, context, message,
|
result = self.driver.send(target, context, message,
|
||||||
wait_for_reply=True)
|
wait_for_reply=True,
|
||||||
|
timeout=60)
|
||||||
self.assertTrue(result)
|
self.assertTrue(result)
|
||||||
|
|
||||||
def test_send_receive_notification(self):
|
def test_send_receive_notification(self):
|
||||||
|
|
|
@ -51,7 +51,7 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
with contextlib.closing(zmq_request.FanoutRequest(
|
with contextlib.closing(zmq_request.FanoutRequest(
|
||||||
target, context={}, message={'method': 'hello-world'},
|
target, context={}, message={'method': 'hello-world'},
|
||||||
timeout=0, retry=None)) as request:
|
retry=None)) as request:
|
||||||
self.publisher.send_request([request.create_envelope(),
|
self.publisher.send_request([request.create_envelope(),
|
||||||
pickle.dumps(request)])
|
pickle.dumps(request)])
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue