Add HTTP driver to oslo.messaging.drivers

This commit initiates the implementation of the HTTP driver class.
It also sets up the interfaces for HTTPListener and HTTPClient,
which will be implemented in subsequent stages.

Partially-implements: blueprint oslo-http-driver
Change-Id: Ic6c01c7eafb20527e061a8c98225c6f6c9439788
This commit is contained in:
Xiang Wang 2024-04-02 10:56:43 +09:00 committed by xiang-roger-wang
parent d06ac93054
commit a78c99a23c
3 changed files with 662 additions and 0 deletions

View File

@ -0,0 +1,250 @@
# Copyright 2024 LY Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import secrets
import socket
from oslo_config import cfg
from oslo_log import log as logging
import urllib3
from urllib3.exceptions import TimeoutError
from oslo_messaging._drivers import base
from oslo_messaging._drivers.http_driver import consul_operator
from oslo_messaging._drivers.http_driver import service_broker
from oslo_messaging import exceptions
DEFAULT_HOST = socket.gethostbyname(socket.getfqdn())
LOG = logging.getLogger(__name__)
urllib3.disable_warnings()
http_opts = [
cfg.IntOpt('max_rpc_request_retries',
default=5,
help='Maximum number of retries for sending RPC requests'),
]
def register_driver_opts(conf):
opt_group = cfg.OptGroup(name='oslo_messaging_http',
title='HTTP driver options')
conf.register_group(opt_group)
conf.register_opts(http_opts, group=opt_group)
class HTTPRequestException(Exception):
def __init__(self, message):
message = "Fail to send HTTP request: %s" % message
super(HTTPRequestException, self).__init__(message)
class HTTPClient(object):
def __init__(self, conf, allowed_remote_exmods):
"""To be implemented."""
pass # TODO(xiang-roger-wang): Implement __init__ method.
def call(self, service_name, message, timeout):
"""To be implemented."""
pass # TODO(xiang-roger-wang): Implement call method.
def cast(self, service_name, message):
"""To be implemented."""
pass # TODO(xiang-roger-wang): Implement cast method.
def fanout(self, service_name, topic, server, exchange, message):
"""To be implemented."""
pass # TODO(xiang-roger-wang): Implement fanout method.
def request_http(self, service_name, rpc_type, rpc_data, timeout=None):
"""To be implemented."""
pass # TODO(xiang-roger-wang): Implement request_http method.
class HTTPListener(base.Listener):
"""To be implemented."""
def __init__(self, driver, target, service_broker,
batch_size, batch_timeout):
super(HTTPListener, self).__init__(
batch_size, batch_timeout, driver.prefetch_size
)
class HTTPDriver(base.BaseDriver):
def __init__(self, conf, url,
default_exchange=None, allowed_remote_exmods=None,
*args, **kwargs):
super(HTTPDriver, self).__init__(conf, url, default_exchange,
allowed_remote_exmods)
register_driver_opts(conf)
# consul_operator is the only supported service broker for now.
self.service_broker = consul_operator.ConsulOperator()
self.http_client = HTTPClient(conf, allowed_remote_exmods)
self.retries = conf.oslo_messaging_http.max_rpc_request_retries
def _get_exchange(self, target):
return target.exchange or self._default_exchange
def listen(self, target, batch_size=1, batch_timeout=1):
listener = HTTPListener(
self, target, self.service_broker, batch_size,
batch_timeout)
return listener
def cleanup(self, *args):
pass
def listen_for_notifications(self, *args):
raise NotImplementedError(
'The listen notification for HTTP driver is not implemented')
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
call_monitor_timeout=None, retry=None, transport_options=None):
try:
return self._send(target, ctxt, message, wait_for_reply, timeout)
except (service_broker.ServiceBrokerException,
exceptions.InvalidTarget,
HTTPRequestException):
LOG.error('Failed to send data to target %(target)s'
% {'target': target})
raise
def _send(self, target, ctxt, message,
wait_for_reply=None, timeout=None):
if target.server:
topic = '%s.%s' % (target.topic, target.server)
else:
topic = target.topic
if wait_for_reply:
# Call RPC
send_call_with_retry = retry_decorator(self._send_call,
self.retries,
(TimeoutError))
return send_call_with_retry(topic, target, ctxt, message, timeout)
else:
# Cast RPC
if target.fanout:
send_fanout_with_retry = retry_decorator(self._send_fanout,
self.retries)
send_fanout_with_retry(topic, target, ctxt, message)
else:
_send_cast_with_retry = retry_decorator(self._send_cast,
self.retries)
_send_cast_with_retry(topic, target, ctxt, message)
return None
def _send_call(self, topic, target, ctxt, message, timeout=None):
target_service = self._select_target_service(target, ctxt, message)
log_msg = "CALL topic '%(topic)s'" % {'topic': topic}
LOG.info(log_msg)
try:
resp = self.http_client.call(target_service, message, timeout)
except Exception as e:
LOG.error('RPC:call failed. detail=%s', str(e))
raise
else:
return resp
def _send_cast(self, topic, target, ctxt, message):
target_service = self._select_target_service(target, ctxt, message)
log_msg = "CAST topic '%(topic)s'" % {'topic': topic}
LOG.info(log_msg)
self.http_client.cast(target_service, message)
def _send_fanout(self, topic, target, ctxt, message):
target_service = self._select_target_service(target, ctxt, message)
exchange = self._get_exchange(target)
log_msg = "CAST FANOUT exchange '%(exchange)s', " \
"topic '%(topic)s'" % {'exchange': exchange,
'topic': topic}
LOG.info(log_msg)
self.http_client.fanout(target_service,
target.topic,
target.server,
exchange,
message)
def _select_target_service(self, target, ctxt, message):
"""Select a target service.
Select a target service from the list of available services
on Service Broker.
"""
service_list = []
if target.fanout:
service_name = 'fanout'
service_list = self._get_service_list(service_name)
else:
service_name = self._get_exchange(target) + '.' + target.topic
service_list = self._get_service_list(service_name, target.server)
LOG.info("Start sending message to service %s", service_name)
if len(service_list) == 0:
raise exceptions.InvalidTarget(
'Target does not exist on Service Broker', service_name)
self._pack_context_with_message(ctxt, message)
# Pick one target host randomly from service list
target_service = secrets.choice(service_list)
return target_service
def _get_service_list(self, service_name, server=None):
return self.service_broker.get_service_list(
service_name, server)
def _pack_context_with_message(self, ctxt, msg):
"""Pack context into msg."""
if isinstance(ctxt, dict):
context_d = ctxt
else:
context_d = ctxt.to_dict()
msg['args']['context'] = context_d
return msg
def send_notification(self, *args):
raise NotImplementedError(
'The send notification for HTTP driver is not implemented')
def retry_decorator(func, times, excluded_exceptions=()):
"""Retry decorator.
Return a decorated `func` that retries `times` times if any exceptions are
thrown except for those listed in `excluded_exceptions`.
:param times: The number of times to repeat the wrapped function/method.
:type times: Int
:param excluded_exceptions: Lists of exceptions to be excluded.
:type Exceptions: Tuple of Exceptions
"""
def new_func(*args, **kwargs):
attempt = 0
while attempt < times:
try:
return func(*args, **kwargs)
except excluded_exceptions:
# Skip this exception.
raise
except Exception as e:
# Otherwise, retry.
LOG.warning('Exception thrown when attempting to send a RPC '
'request with HTTP driver. Retrying attempt '
'%(attempt)s of %(times)s. '
'Exception: %(exception)s',
{'attempt': attempt + 1,
'times': times, 'exception': e})
attempt += 1
return func(*args, **kwargs)
return new_func

View File

@ -0,0 +1,409 @@
# Copyright 2024 LY Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock
from oslo_config import cfg
import testscenarios
from urllib3.exceptions import TimeoutError
import oslo_messaging
from oslo_messaging._drivers.http_driver import service_broker
from oslo_messaging._drivers import impl_http
from oslo_messaging import exceptions
from oslo_messaging.tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
# Sample service list returned from the service broker.
formatted_services = [
{
"Node": "consul-server1",
"Datacenter": "dc1",
"ID": "hostname11:3000",
"Name": "nova.compute",
"Tags": [
"hostname1"
],
"Address": "1.2.3.4",
"Port": 3000,
"Meta": {
"reverse_proxy_endpoint": ""
}
},
{
"Node": "consul-server1",
"Datacenter": "dc1",
"ID": "hostname22:4000",
"Name": "nova.compute",
"Tags": [
"hostname1"
],
"Address": "2.3.4.5",
"Port": 4000,
"Meta": {
"reverse_proxy_endpoint": ""
}
}
]
ctxt = {
'domain': None,
'project_name': 'test-project',
'project_domain': None,
'timestamp': '2021-11-29T13:07:16.305218',
'auth_token': 'dummy_token',
'remote_address': '127.0.0.1',
'is_admin': 1,
'user': 'dummy_uuid',
'tenant': 'dummy_tenant_id',
'project_id': 'dummy_tenant_id',
'user_id': u'dummy_user_id',
'roles': [u'member', u'power_user'],
'request_id': 'req-12345',
'user_name': 'dummy_user_name'}
message = {
'args': {
'args':
{'spec_obj': {
'nova_object.version': '1.5',
'nova_object.name': 'RequestSpec',
'nova_object.data': {
'instance_uuid': 'dummy_uuid',
'nova_object.namespace': 'nova'
}
},
}},
'method': 'select_destinations',
'version': u'4.3',
'namespace': 'nova'
}
class Target(object):
def __init__(self):
self.server = None
self.exchange = "nova"
self.topic = "scheduler"
self.fanout = False
class FakeException(Exception):
pass
class TestHTTPDriver(test_utils.BaseTestCase):
def setUp(self):
conf = cfg.ConfigOpts()
super(TestHTTPDriver, self).setUp(conf=conf)
impl_http.register_driver_opts(conf)
self.messaging_conf.transport_url = 'http://'
self.config(max_rpc_request_retries=0, group="oslo_messaging_http")
transport = oslo_messaging.get_transport(self.conf)
self.addCleanup(transport.cleanup)
self.driver = transport._driver
self.target = Target()
message['args']['context'] = ctxt
self.token = None
def test_driver_load(self):
self.assertIsInstance(self.driver, impl_http.HTTPDriver)
def setup_send(self, m_get_services, m_http_client, rpc_type='call'):
m_get_services.return_value = formatted_services
self.m_client = mock.MagicMock()
self.m_client.call.return_value = {'result': 'rpc_server_resp'}
self.m_client.fanout.return_value = None
m_http_client.return_value = self.m_client
self.driver = oslo_messaging.get_transport(self.conf)._driver
# Test cases for Call RPC.
@mock.patch('secrets.choice')
@mock.patch('oslo_messaging._drivers.impl_http.HTTPClient')
@mock.patch.object(impl_http.HTTPDriver, "_get_service_list")
def test_send_call_success(self, m_get_services, m_http_c, m_choice):
self.setup_send(m_get_services, m_http_c, m_choice)
m_choice.return_value = formatted_services[0]
ret = self.driver.send(
self.target, ctxt, message, wait_for_reply=True, timeout=10)
self.assertEqual({'result': 'rpc_server_resp'}, ret)
# Check Call RPC.
self.m_client.call.assert_called_once_with(formatted_services[0],
message, 10)
# Check if expected service name is passed to get the service list.
m_get_services.assert_called_once_with('nova.scheduler', None)
m_choice.assert_called_once_with(formatted_services)
@mock.patch('secrets.choice')
@mock.patch('oslo_messaging._drivers.impl_http.HTTPClient')
@mock.patch.object(impl_http.HTTPDriver, "_get_service_list")
def test_send_call_success_with_default_exchange(
self, m_get_services, m_http_client, m_choice):
self.setup_send(m_get_services, m_http_client, m_choice)
self.target.exchange = None
m_choice.return_value = formatted_services[0]
ret = self.driver.send(self.target, ctxt, message, wait_for_reply=True,
timeout=10)
self.assertEqual({'result': 'rpc_server_resp'}, ret)
# Check Call RPC
self.m_client.call.assert_called_once_with(formatted_services[0],
message, 10)
# Check if expected service name is passed to get the service list.
m_get_services.assert_called_once_with('openstack.scheduler', None)
m_choice.assert_called_once_with(formatted_services)
@mock.patch('oslo_messaging._drivers.impl_http.HTTPClient')
@mock.patch.object(impl_http.HTTPDriver, "_get_service_list")
def test_send_call_with_server_success(self, m_get_services,
m_http_client):
self.setup_send(m_get_services, m_http_client)
# Return hostname22(2.3.4.5:4000)
m_get_services.return_value = [formatted_services[1]]
# Set hostname22(2.3.4.5:4000) as target server
self.target.server = 'hostname22'
ret = self.driver.send(
self.target, ctxt, message, wait_for_reply=True, timeout=10)
self.assertEqual({'result': 'rpc_server_resp'}, ret)
# Check Call RPC
self.m_client.call.assert_called_once_with(formatted_services[1],
message, 10)
# Check if expected service name is passed to get the service list.
m_get_services.assert_called_once_with('nova.scheduler', 'hostname22')
@mock.patch('secrets.choice')
@mock.patch('oslo_messaging._drivers.impl_http.HTTPClient')
@mock.patch.object(impl_http.HTTPDriver, "_get_service_list")
def test_send_call_with_retry_once(self, m_get_services, m_http_client,
m_choice):
self.setup_send(m_get_services, m_http_client)
# Set # of retries to be 1.
self.driver.retries = 1
m_choice.return_value = formatted_services[0]
# Raise exception for the first call, and succeed in the second call.
self.m_client.call.side_effect = [Exception('fake error'),
{'result': 'rpc_server_resp'}]
ret = self.driver.send(
self.target, ctxt, message, wait_for_reply=True, timeout=10)
self.assertEqual({'result': 'rpc_server_resp'}, ret)
# Check Call RPC
self.m_client.call.assert_called_with(formatted_services[0],
message, 10)
# Ensure it has been called twice (1 + 1 retry).
self.assertEqual(2, self.m_client.call.call_count)
# Check if expected service name is passed to get the service list.
m_get_services.assert_called_with('nova.scheduler', None)
self.assertEqual(2, m_get_services.call_count)
m_choice.assert_called_with(formatted_services)
self.assertEqual(2, m_choice.call_count)
@mock.patch('secrets.choice')
@mock.patch('oslo_messaging._drivers.impl_http.HTTPClient')
@mock.patch.object(impl_http.HTTPDriver, "_get_service_list")
def test_send_call_with_retry_twice(self, m_get_services, m_http_client,
m_choice):
self.setup_send(m_get_services, m_http_client)
# Set # of retries to be 2.
self.driver.retries = 2
m_choice.return_value = formatted_services[0]
# Raise exception for the first call, and succeed in the second call.
self.m_client.call.side_effect = [Exception('fake error 1'),
Exception('fake error 2'),
{'result': 'rpc_server_resp'}]
ret = self.driver.send(
self.target, ctxt, message, wait_for_reply=True, timeout=10)
self.assertEqual({'result': 'rpc_server_resp'}, ret)
# Check Call RPC
self.m_client.call.assert_called_with(formatted_services[0],
message, 10)
# Ensure it has been called 3 times (1 + 2 retries).
self.assertEqual(3, self.m_client.call.call_count)
# Check if expected service name is passed to get the service list.
m_get_services.assert_called_with('nova.scheduler', None)
self.assertEqual(3, m_get_services.call_count)
m_choice.assert_called_with(formatted_services)
self.assertEqual(3, m_choice.call_count)
@mock.patch('secrets.choice')
@mock.patch('oslo_messaging._drivers.impl_http.HTTPClient')
@mock.patch.object(impl_http.HTTPDriver, "_get_service_list")
def test_send_call_with_retry_and_excluded_errors(self,
m_get_services,
m_http_client,
m_choice):
self.setup_send(m_get_services, m_http_client)
# Set # of retries to be 1.
self.driver.retries = 1
m_choice.return_value = formatted_services[0]
# Raise exception for the first call, and succeed in the second call.
self.m_client.call.side_effect = [TimeoutError('fake error'),
{'result': 'rpc_server_resp'}]
# There should be no retry since TimeoutError is an excluded exception.
self.assertRaises(TimeoutError,
self.driver.send, self.target, ctxt, message,
wait_for_reply=True, timeout=10)
# Check Call RPC
self.m_client.call.assert_called_once_with(formatted_services[0],
message, 10)
# Check if expected service name is passed to get the service list.
m_get_services.assert_called_once_with('nova.scheduler', None)
m_choice.assert_called_once_with(formatted_services)
# Test cases for Cast RPC.
@mock.patch('secrets.choice')
@mock.patch('oslo_messaging._drivers.impl_http.HTTPClient')
@mock.patch.object(impl_http.HTTPDriver, "_get_service_list")
def test_send_cast_success(self, m_get_services, m_http_client,
m_choice):
self.setup_send(m_get_services, m_http_client)
m_choice.return_value = formatted_services[0]
ret = self.driver.send(
self.target, ctxt, message, wait_for_reply=False)
self.assertIsNone(ret)
# Check if expected service name is passed to get the service list.
m_get_services.assert_called_once_with('nova.scheduler', None)
# Check Cast RPC
self.m_client.cast.assert_called_once_with(formatted_services[0],
message)
m_choice.assert_called_once_with(formatted_services)
# Test cases for Cast RPC.
@mock.patch('secrets.choice')
@mock.patch('oslo_messaging._drivers.impl_http.HTTPClient')
@mock.patch.object(impl_http.HTTPDriver, "_get_service_list")
def test_send_cast_with_retry_once(self, m_get_services, m_http_client,
m_choice):
self.setup_send(m_get_services, m_http_client)
# Set # of retries to be 1.
self.driver.retries = 1
m_choice.return_value = formatted_services[0]
# Raise exception for the first call, and succeed in the second call.
self.m_client.cast.side_effect = [Exception('fake error'),
None]
ret = self.driver.send(
self.target, ctxt, message, wait_for_reply=False)
self.assertIsNone(ret)
# Check if expected service name is passed to get the service list.
m_get_services.assert_called_with('nova.scheduler', None)
self.assertEqual(2, m_get_services.call_count)
# Check Cast RPC
self.m_client.cast.assert_called_with(formatted_services[0],
message)
self.assertEqual(2, self.m_client.cast.call_count)
m_choice.assert_called_with(formatted_services)
self.assertEqual(2, m_choice.call_count)
# Test cases for Fanout RPC
@mock.patch('secrets.choice')
@mock.patch('oslo_messaging._drivers.impl_http.HTTPClient')
@mock.patch.object(impl_http.HTTPDriver, "_get_service_list")
def test_send_fanout_success(self, m_get_services,
m_http_client, m_choice):
self.setup_send(m_get_services, m_http_client)
m_choice.return_value = formatted_services[0]
self.target.fanout = True
ret = self.driver.send(
self.target, ctxt, message, wait_for_reply=False)
self.assertIsNone(ret)
# Check if expected service name is passed to get the service list.
m_get_services.assert_called_once_with('fanout')
# Check Fanout RPC
self.m_client.fanout.assert_called_once_with(formatted_services[0],
'scheduler', None,
'nova', message)
@mock.patch('secrets.choice')
@mock.patch('oslo_messaging._drivers.impl_http.HTTPClient')
@mock.patch.object(impl_http.HTTPDriver, "_get_service_list")
def test_send_fanout_with_retry_once(self, m_get_services,
m_http_client,
m_choice):
self.setup_send(m_get_services, m_http_client)
# Set # of retries to be 1.
self.driver.retries = 1
m_choice.return_value = formatted_services[0]
self.target.fanout = True
# Raise exception for the first call, and succeed in the second call.
self.m_client.fanout.side_effect = [Exception('fake error'),
None]
ret = self.driver.send(
self.target, ctxt, message, wait_for_reply=False)
self.assertIsNone(ret)
# Check if expected service name is passed to get the service list.
m_get_services.assert_called_with('fanout')
self.assertEqual(2, m_get_services.call_count)
# Check Fanout RPC
self.m_client.fanout.assert_called_with(formatted_services[0],
'scheduler', None,
'nova', message)
self.assertEqual(2, self.m_client.fanout.call_count)
# Test with no service found from the service broker.
@mock.patch.object(impl_http.HTTPDriver, "_get_service_list")
def test_send_service_not_found(self, m_get_services):
m_get_services.return_value = []
self.target.fanout = False
driver = oslo_messaging.get_transport(self.conf)._driver
self.assertRaises(exceptions.InvalidTarget,
driver.send,
self.target,
ctxt,
message,
wait_for_reply=True,
timeout=60)
# Test with ServiceBrokerException.
@mock.patch.object(impl_http.HTTPDriver, "_get_service_list")
def test_send_with_service_broker_error(self, m_get_services):
m_get_services.side_effect = [
service_broker.ServiceBrokerException('fake service broker error')]
self.target.fanout = False
driver = oslo_messaging.get_transport(self.conf)._driver
self.assertRaises(service_broker.ServiceBrokerException,
driver.send,
self.target,
ctxt,
message,
wait_for_reply=True,
timeout=60)

View File

@ -54,6 +54,9 @@ oslo.messaging.drivers =
# This is just for internal testing
fake = oslo_messaging._drivers.impl_fake:FakeDriver
# HTTP driver
http = oslo_messaging._drivers.impl_http:HTTPDriver
oslo.messaging.executors =
eventlet = futurist:GreenThreadPoolExecutor
threading = futurist:ThreadPoolExecutor