Make common Metadata Proxy classes

The ML2 and OVN metadata agents have almost identical
code, as the former was copied to the latter and modified.
Instead, combine all the common parts and just have
each do any proxy-specific operations separately.

Change-Id: Ie1a482f4a028cd61233157f992f5476117e9a6cc
This commit is contained in:
Brian Haley 2024-03-27 10:22:56 -04:00
parent bcf1f707bc
commit 7b4e9f8c26
6 changed files with 501 additions and 559 deletions

View File

@ -12,10 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import urllib
import netaddr
from neutron_lib.agent import topics
from neutron_lib import constants
from neutron_lib import context
@ -23,27 +19,16 @@ from neutron_lib.utils import host
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_utils import netutils
import requests
import webob
from neutron._i18n import _
from neutron.agent.common import base_agent_rpc
from neutron.agent.linux import utils as agent_utils
from neutron.agent.metadata import proxy_base
from neutron.agent import rpc as agent_rpc
from neutron.common import cache_utils as cache
from neutron.common import ipv6_utils
from neutron.common import utils as common_utils
from neutron.conf.agent.metadata import config
LOG = logging.getLogger(__name__)
MODE_MAP = {
config.USER_MODE: 0o644,
config.GROUP_MODE: 0o664,
config.ALL_MODE: 0o666,
}
class MetadataPluginAPI(base_agent_rpc.BasePluginApi):
"""Agent-side RPC for metadata agent-to-plugin interaction.
@ -66,41 +51,17 @@ class MetadataPluginAPI(base_agent_rpc.BasePluginApi):
version='1.0')
class MetadataProxyHandler(object):
class MetadataProxyHandler(proxy_base.MetadataProxyHandlerBase):
NETWORK_ID_HEADER = 'X-Neutron-Network-ID'
ROUTER_ID_HEADER = 'X-Neutron-Router-ID'
def __init__(self, conf):
self.conf = conf
self._cache = cache.get_cache(self.conf)
self._cache = cache.get_cache(conf)
super().__init__(conf, has_cache=True)
self.plugin_rpc = MetadataPluginAPI(topics.PLUGIN)
self.context = context.get_admin_context_without_session()
@webob.dec.wsgify(RequestClass=webob.Request)
def __call__(self, req):
try:
LOG.debug("Request: %s", req)
instance_id, tenant_id = self._get_instance_and_tenant_id(req)
if instance_id:
res = self._proxy_request(instance_id, tenant_id, req)
if isinstance(res, webob.exc.HTTPNotFound):
LOG.info("The instance: %s is not present anymore, "
"skipping cache...", instance_id)
instance_id, tenant_id = self._get_instance_and_tenant_id(
req, skip_cache=True)
if instance_id:
return self._proxy_request(instance_id, tenant_id, req)
return res
else:
return webob.exc.HTTPNotFound()
except Exception:
LOG.exception("Unexpected error.")
msg = _('An unknown error has occurred. '
'Please try your request again.')
explanation = str(msg)
return webob.exc.HTTPInternalServerError(explanation=explanation)
def _get_ports_from_server(self, router_id=None, ip_address=None,
networks=None, mac_address=None):
"""Get ports from server."""
@ -136,31 +97,24 @@ class MetadataProxyHandler(object):
@cache.cache_method_results
def _get_ports_for_remote_address(self, remote_address, networks,
skip_cache=False,
remote_mac=None):
"""Get list of ports that has given ip address and are part of
remote_mac=None,
skip_cache=False):
"""Get list of ports that has given IP address and are part of
given networks.
:param networks: list of networks in which the ip address will be
:param remote_address: IP address to search for
:param networks: List of networks in which the IP address will be
searched for
:param skip_cache: when have to skip getting entry from cache
:param remote_mac: Remote MAC to filter by, if given
:param skip_cache: When to skip getting entry from cache
"""
return self._get_ports_from_server(networks=networks,
ip_address=remote_address,
mac_address=remote_mac)
def _get_ports(self, remote_address, network_id=None, router_id=None,
skip_cache=False, remote_mac=None):
"""Search for all ports that contain passed ip address and belongs to
given network.
If no network is passed ports are searched on all networks connected to
given router. Either one of network_id or router_id must be passed.
:param skip_cache: when have to skip getting entry from cache
"""
def get_port(self, remote_address, network_id=None, remote_mac=None,
router_id=None, skip_cache=False):
if network_id:
networks = (network_id,)
elif router_id:
@ -168,127 +122,33 @@ class MetadataProxyHandler(object):
skip_cache=skip_cache)
else:
raise TypeError(_("Either one of parameter network_id or router_id"
" must be passed to _get_ports method."))
" must be passed to get_port method."))
return self._get_ports_for_remote_address(remote_address, networks,
skip_cache=skip_cache,
remote_mac=remote_mac)
def _get_instance_and_tenant_id(self, req, skip_cache=False):
forwarded_for = req.headers.get('X-Forwarded-For')
network_id = req.headers.get('X-Neutron-Network-ID')
router_id = req.headers.get('X-Neutron-Router-ID')
# Only one should be given, drop since it could be spoofed
if network_id and router_id:
LOG.debug("Both network and router IDs were specified in proxy "
"request, but only a single one of the two is allowed, "
"dropping")
return None, None
remote_mac = None
remote_ip = netaddr.IPAddress(forwarded_for)
if remote_ip.version == constants.IP_VERSION_6:
if remote_ip.is_ipv4_mapped():
# When haproxy listens on v4 AND v6 then it inserts ipv4
# addresses as ipv4-mapped v6 addresses into X-Forwarded-For.
forwarded_for = str(remote_ip.ipv4())
if remote_ip.is_link_local():
# When haproxy sees an ipv6 link-local client address
# (and sends that to us in X-Forwarded-For) we must rely
# on the EUI encoded in it, because that's all we can
# recognize.
remote_mac = str(netutils.get_mac_addr_by_ipv6(remote_ip))
ports = self._get_ports(
forwarded_for, network_id, router_id,
skip_cache=skip_cache, remote_mac=remote_mac)
LOG.debug("Gotten ports for remote_address %(remote_address)s, "
"network_id %(network_id)s, router_id %(router_id)s are: "
ports = self._get_ports_for_remote_address(remote_address, networks,
remote_mac=remote_mac,
skip_cache=skip_cache)
LOG.debug("Got ports for remote_address %(remote_address)s, "
"network_id %(network_id)s, remote_mac %(remote_mac)s, "
"router_id %(router_id)s"
"%(ports)s",
{"remote_address": forwarded_for,
{"remote_address": remote_address,
"network_id": network_id,
"remote_mac": remote_mac,
"router_id": router_id,
"ports": ports})
if len(ports) == 1:
num_ports = len(ports)
if num_ports == 1:
return ports[0]['device_id'], ports[0]['tenant_id']
elif num_ports == 0:
LOG.error("No port found in network %s with IP address %s",
network_id, remote_address)
return None, None
def _proxy_request(self, instance_id, tenant_id, req):
headers = {
'X-Forwarded-For': req.headers.get('X-Forwarded-For'),
'X-Instance-ID': instance_id,
'X-Tenant-ID': tenant_id,
'X-Instance-ID-Signature': common_utils.sign_instance_id(
self.conf, instance_id)
}
nova_host_port = ipv6_utils.valid_ipv6_url(
self.conf.nova_metadata_host,
self.conf.nova_metadata_port)
url = urllib.parse.urlunsplit((
self.conf.nova_metadata_protocol,
nova_host_port,
req.path_info,
req.query_string,
''))
disable_ssl_certificate_validation = self.conf.nova_metadata_insecure
if self.conf.auth_ca_cert and not disable_ssl_certificate_validation:
verify_cert = self.conf.auth_ca_cert
else:
verify_cert = not disable_ssl_certificate_validation
client_cert = None
if self.conf.nova_client_cert and self.conf.nova_client_priv_key:
client_cert = (self.conf.nova_client_cert,
self.conf.nova_client_priv_key)
try:
resp = requests.request(method=req.method, url=url,
headers=headers,
data=req.body,
cert=client_cert,
verify=verify_cert,
timeout=60)
except requests.ConnectionError:
msg = _('The remote metadata server is temporarily unavailable. '
'Please try again later.')
explanation = str(msg)
return webob.exc.HTTPServiceUnavailable(explanation=explanation)
if resp.status_code == 200:
req.response.content_type = resp.headers['content-type']
req.response.body = resp.content
LOG.debug(str(resp))
return req.response
elif resp.status_code == 403:
LOG.warning(
'The remote metadata server responded with Forbidden. This '
'response usually occurs when shared secrets do not match.'
)
return webob.exc.HTTPForbidden()
elif resp.status_code == 500:
msg = _(
'Remote metadata server experienced an internal server error.'
)
LOG.warning(msg)
explanation = str(msg)
return webob.exc.HTTPInternalServerError(explanation=explanation)
elif resp.status_code in (400, 404, 409, 502, 503, 504):
webob_exc_cls = webob.exc.status_map.get(resp.status_code)
return webob_exc_cls()
else:
raise Exception(_('Unexpected response code: %s') %
resp.status_code)
class UnixDomainMetadataProxy(object):
class UnixDomainMetadataProxy(proxy_base.UnixDomainMetadataProxyBase):
def __init__(self, conf):
self.conf = conf
super().__init__(conf)
agent_utils.ensure_directory_exists_without_file(
cfg.CONF.metadata_proxy_socket)
@ -335,24 +195,6 @@ class UnixDomainMetadataProxy(object):
LOG.info('Successfully reported state after a previous failure.')
self.agent_state.pop('start_flag', None)
def _get_socket_mode(self):
mode = self.conf.metadata_proxy_socket_mode
if mode == config.DEDUCE_MODE:
user = self.conf.metadata_proxy_user
if (not user or user == '0' or user == 'root' or
agent_utils.is_effective_user(user)):
# user is agent effective user or root => USER_MODE
mode = config.USER_MODE
else:
group = self.conf.metadata_proxy_group
if not group or agent_utils.is_effective_group(group):
# group is agent effective group => GROUP_MODE
mode = config.GROUP_MODE
else:
# otherwise => ALL_MODE
mode = config.ALL_MODE
return MODE_MAP[mode]
def run(self):
server = agent_utils.UnixDomainWSGIServer(
constants.AGENT_PROCESS_METADATA)

View File

@ -0,0 +1,224 @@
# Copyright 2012 New Dream Network, LLC (DreamHost)
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import urllib
import netaddr
from neutron_lib import constants
from oslo_log import log as logging
from oslo_utils import netutils
import requests
import webob
from neutron._i18n import _
from neutron.agent.linux import utils as agent_utils
from neutron.common import ipv6_utils
from neutron.common import utils as common_utils
from neutron.conf.agent.metadata import config
LOG = logging.getLogger(__name__)
MODE_MAP = {
config.USER_MODE: 0o644,
config.GROUP_MODE: 0o664,
config.ALL_MODE: 0o666,
}
class MetadataProxyHandlerBase(object, metaclass=abc.ABCMeta):
NETWORK_ID_HEADER = None
ROUTER_ID_HEADER = None
def __init__(self, conf, has_cache=False):
self.conf = conf
self._has_cache = has_cache
@abc.abstractmethod
def get_port(self, remote_address, network_id=None, remote_mac=None,
router_id=None, skip_cache=False):
"""Search for a single port that contain the given IP address and
belongs to the given network.
If no network is passed, ports are searched on all networks connected
to a given router. Either one of network_id or router_id must be given.
:param remote_address: IP address to search for
:param network_id: Network ID to filter by, if given
:param remote_mac: Remote MAC to filter by, if given
:param router_id: Router ID to filter by, if given
:param skip_cache: When to skip getting entry from cache
"""
pass
@webob.dec.wsgify(RequestClass=webob.Request)
def __call__(self, req):
try:
LOG.debug("Request: %s", req)
instance_id, project_id = self._get_instance_and_project_id(req)
if instance_id:
res = self._proxy_request(instance_id, project_id, req)
if isinstance(res, webob.exc.HTTPNotFound) and self._has_cache:
LOG.info("The instance: %s is not present anymore, "
"skipping cache...", instance_id)
instance_id, project_id = (
self._get_instance_and_project_id(req,
skip_cache=True))
if instance_id:
res = self._proxy_request(instance_id, project_id, req)
return res
else:
return webob.exc.HTTPNotFound()
except Exception:
LOG.exception("Unexpected error.")
msg = _('An unknown error has occurred. '
'Please try your request again.')
explanation = str(msg)
return webob.exc.HTTPInternalServerError(explanation=explanation)
def _get_instance_and_project_id(self, req, skip_cache=False):
forwarded_for = req.headers.get('X-Forwarded-For')
network_id = req.headers.get(self.NETWORK_ID_HEADER)
router_id = (req.headers.get(self.ROUTER_ID_HEADER)
if self.ROUTER_ID_HEADER else None)
# Only one should be given, drop since it could be spoofed
if network_id and router_id:
LOG.debug("Both network and router IDs were specified in proxy "
"request, but only a single one of the two is allowed, "
"dropping")
return None, None
remote_mac = None
remote_ip = netaddr.IPAddress(forwarded_for)
if remote_ip.version == constants.IP_VERSION_6:
if remote_ip.is_ipv4_mapped():
# When haproxy listens on v4 AND v6 then it inserts ipv4
# addresses as ipv4-mapped v6 addresses into X-Forwarded-For.
forwarded_for = str(remote_ip.ipv4())
if remote_ip.is_link_local():
# When haproxy sees an ipv6 link-local client address
# (and sends that to us in X-Forwarded-For) we must rely
# on the EUI encoded in it, because that's all we can
# recognize.
remote_mac = str(netutils.get_mac_addr_by_ipv6(remote_ip))
instance_id, project_id = self.get_port(forwarded_for,
network_id=network_id,
remote_mac=remote_mac,
router_id=router_id,
skip_cache=skip_cache)
return instance_id, project_id
def _proxy_request(self, instance_id, project_id, req):
headers = {
'X-Forwarded-For': req.headers.get('X-Forwarded-For'),
'X-Instance-ID': instance_id,
'X-Tenant-ID': project_id,
'X-Instance-ID-Signature': common_utils.sign_instance_id(
self.conf, instance_id)
}
nova_host_port = ipv6_utils.valid_ipv6_url(
self.conf.nova_metadata_host,
self.conf.nova_metadata_port)
url = urllib.parse.urlunsplit((
self.conf.nova_metadata_protocol,
nova_host_port,
req.path_info,
req.query_string,
''))
disable_ssl_certificate_validation = self.conf.nova_metadata_insecure
if self.conf.auth_ca_cert and not disable_ssl_certificate_validation:
verify_cert = self.conf.auth_ca_cert
else:
verify_cert = not disable_ssl_certificate_validation
client_cert = None
if self.conf.nova_client_cert and self.conf.nova_client_priv_key:
client_cert = (self.conf.nova_client_cert,
self.conf.nova_client_priv_key)
try:
resp = requests.request(method=req.method, url=url,
headers=headers,
data=req.body,
cert=client_cert,
verify=verify_cert,
timeout=60)
except requests.ConnectionError:
msg = _('The remote metadata server is temporarily unavailable. '
'Please try again later.')
explanation = str(msg)
return webob.exc.HTTPServiceUnavailable(explanation=explanation)
if resp.status_code == 200:
req.response.content_type = resp.headers['content-type']
req.response.body = resp.content
LOG.debug(str(resp))
return req.response
elif resp.status_code == 403:
LOG.warning(
'The remote metadata server responded with Forbidden. This '
'response usually occurs when shared secrets do not match.'
)
return webob.exc.HTTPForbidden()
elif resp.status_code == 500:
msg = _(
'Remote metadata server experienced an internal server error.'
)
LOG.warning(msg)
explanation = str(msg)
return webob.exc.HTTPInternalServerError(explanation=explanation)
elif resp.status_code in (400, 404, 409, 502, 503, 504):
webob_exc_cls = webob.exc.status_map.get(resp.status_code)
return webob_exc_cls()
else:
raise Exception(_('Unexpected response code: %s') %
resp.status_code)
class UnixDomainMetadataProxyBase(object, metaclass=abc.ABCMeta):
def __init__(self, conf):
self.conf = conf
def _get_socket_mode(self):
mode = self.conf.metadata_proxy_socket_mode
if mode == config.DEDUCE_MODE:
user = self.conf.metadata_proxy_user
if (not user or user == '0' or user == 'root' or
agent_utils.is_effective_user(user)):
# user is agent effective user or root => USER_MODE
mode = config.USER_MODE
else:
group = self.conf.metadata_proxy_group
if not group or agent_utils.is_effective_group(group):
# group is agent effective group => GROUP_MODE
mode = config.GROUP_MODE
else:
# otherwise => ALL_MODE
mode = config.ALL_MODE
return MODE_MAP[mode]
@abc.abstractmethod
def run(self):
pass

View File

@ -13,41 +13,26 @@
# limitations under the License.
import threading
import urllib
import netaddr
from neutron._i18n import _
from neutron.agent.linux import utils as agent_utils
from neutron.agent.metadata import proxy_base
from neutron.agent.ovn.metadata import ovsdb
from neutron.common import ipv6_utils
from neutron.common.ovn import constants as ovn_const
from neutron.common import utils as common_utils
from neutron.conf.agent.metadata import config
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import netutils
import requests
import webob
LOG = logging.getLogger(__name__)
MODE_MAP = {
config.USER_MODE: 0o644,
config.GROUP_MODE: 0o664,
config.ALL_MODE: 0o666,
}
class MetadataProxyHandler(object):
class MetadataProxyHandler(proxy_base.MetadataProxyHandlerBase):
NETWORK_ID_HEADER = 'X-OVN-Network-ID'
def __init__(self, conf, chassis, sb_idl):
self.conf = conf
super().__init__(conf)
self.chassis = chassis
self._sb_idl = sb_idl
self._post_fork_event = threading.Event()
@ -80,44 +65,10 @@ class MetadataProxyHandler(object):
# Now IDL connections can be safely used.
self._post_fork_event.set()
@webob.dec.wsgify(RequestClass=webob.Request)
def __call__(self, req):
try:
LOG.debug("Request: %s", req)
instance_id, project_id = self._get_instance_and_project_id(req)
if instance_id:
return self._proxy_request(instance_id, project_id, req)
else:
return webob.exc.HTTPNotFound()
except Exception:
LOG.exception("Unexpected error.")
msg = _('An unknown error has occurred. '
'Please try your request again.')
explanation = str(msg)
return webob.exc.HTTPInternalServerError(explanation=explanation)
def _get_instance_and_project_id(self, req):
forwarded_for = req.headers.get('X-Forwarded-For')
network_id = req.headers.get('X-OVN-Network-ID')
remote_mac = None
remote_ip = netaddr.IPAddress(forwarded_for)
if remote_ip.version == constants.IP_VERSION_6:
if remote_ip.is_ipv4_mapped():
# When haproxy listens on v4 AND v6 then it inserts ipv4
# addresses as ipv4-mapped v6 addresses into X-Forwarded-For.
forwarded_for = str(remote_ip.ipv4())
if remote_ip.is_link_local():
# When haproxy sees an ipv6 link-local client address
# (and sends that to us in X-Forwarded-For) we must rely
# on the EUI encoded in it, because that's all we can
# recognize.
remote_mac = str(netutils.get_mac_addr_by_ipv6(remote_ip))
def get_port(self, remote_address, network_id=None, remote_mac=None,
router_id=None, skip_cache=False):
ports = self.sb_idl.get_network_port_bindings_by_ip(network_id,
forwarded_for,
remote_address,
mac=remote_mac)
num_ports = len(ports)
if num_ports == 1:
@ -126,114 +77,26 @@ class MetadataProxyHandler(object):
external_ids[ovn_const.OVN_PROJID_EXT_ID_KEY])
elif num_ports == 0:
LOG.error("No port found in network %s with IP address %s",
network_id, forwarded_for)
network_id, remote_address)
elif num_ports > 1:
port_uuids = ', '.join([str(port.uuid) for port in ports])
LOG.error("More than one port found in network %s with IP address "
"%s. Please run the neutron-ovn-db-sync-util script as "
"there seems to be inconsistent data between Neutron "
"and OVN databases. OVN Port uuids: %s", network_id,
forwarded_for, port_uuids)
remote_address, port_uuids)
return None, None
def _proxy_request(self, instance_id, tenant_id, req):
headers = {
'X-Forwarded-For': req.headers.get('X-Forwarded-For'),
'X-Instance-ID': instance_id,
'X-Tenant-ID': tenant_id,
'X-Instance-ID-Signature': common_utils.sign_instance_id(
self.conf, instance_id)
}
nova_host_port = ipv6_utils.valid_ipv6_url(
self.conf.nova_metadata_host,
self.conf.nova_metadata_port)
url = urllib.parse.urlunsplit((
self.conf.nova_metadata_protocol,
nova_host_port,
req.path_info,
req.query_string,
''))
disable_ssl_certificate_validation = self.conf.nova_metadata_insecure
if self.conf.auth_ca_cert and not disable_ssl_certificate_validation:
verify_cert = self.conf.auth_ca_cert
else:
verify_cert = not disable_ssl_certificate_validation
client_cert = None
if self.conf.nova_client_cert and self.conf.nova_client_priv_key:
client_cert = (self.conf.nova_client_cert,
self.conf.nova_client_priv_key)
try:
resp = requests.request(method=req.method, url=url,
headers=headers,
data=req.body,
cert=client_cert,
verify=verify_cert,
timeout=60)
except requests.ConnectionError:
msg = _('The remote metadata server is temporarily unavailable. '
'Please try again later.')
explanation = str(msg)
return webob.exc.HTTPServiceUnavailable(explanation=explanation)
if resp.status_code == 200:
req.response.content_type = resp.headers['content-type']
req.response.body = resp.content
LOG.debug(str(resp))
return req.response
elif resp.status_code == 403:
LOG.warning(
'The remote metadata server responded with Forbidden. This '
'response usually occurs when shared secrets do not match.'
)
return webob.exc.HTTPForbidden()
elif resp.status_code == 500:
msg = _(
'Remote metadata server experienced an internal server error.'
)
LOG.warning(msg)
explanation = str(msg)
return webob.exc.HTTPInternalServerError(explanation=explanation)
elif resp.status_code in (400, 404, 409, 502, 503, 504):
webob_exc_cls = webob.exc.status_map.get(resp.status_code)
return webob_exc_cls()
else:
raise Exception(_('Unexpected response code: %s') %
resp.status_code)
class UnixDomainMetadataProxy(object):
class UnixDomainMetadataProxy(proxy_base.UnixDomainMetadataProxyBase):
def __init__(self, conf, chassis, sb_idl=None):
self.conf = conf
super().__init__(conf)
self.chassis = chassis
self.sb_idl = sb_idl
agent_utils.ensure_directory_exists_without_file(
cfg.CONF.metadata_proxy_socket)
def _get_socket_mode(self):
mode = self.conf.metadata_proxy_socket_mode
if mode == config.DEDUCE_MODE:
user = self.conf.metadata_proxy_user
if (not user or user == '0' or user == 'root' or
agent_utils.is_effective_user(user)):
# user is agent effective user or root => USER_MODE
mode = config.USER_MODE
else:
group = self.conf.metadata_proxy_group
if not group or agent_utils.is_effective_group(group):
# group is agent effective group => GROUP_MODE
mode = config.GROUP_MODE
else:
# otherwise => ALL_MODE
mode = config.ALL_MODE
return MODE_MAP[mode]
def run(self):
self.server = agent_utils.UnixDomainWSGIServer(
'neutron-ovn-metadata-agent')

View File

@ -17,7 +17,6 @@ from unittest import mock
import ddt
import netaddr
from neutron_lib import constants as n_const
import requests
import testtools
import webob
@ -28,6 +27,7 @@ from oslo_utils import netutils
from neutron.agent.linux import utils as agent_utils
from neutron.agent.metadata import agent
from neutron.agent.metadata import proxy_base
from neutron.agent import metadata_agent
from neutron.common import cache_utils as cache
from neutron.common import utils
@ -38,16 +38,6 @@ from neutron.tests import base
class ConfFixture(config_fixture.Config):
def setUp(self):
super(ConfFixture, self).setUp()
meta_conf.register_meta_conf_opts(
meta_conf.METADATA_PROXY_HANDLER_OPTS, self.conf)
self.config(auth_ca_cert=None,
nova_metadata_host='9.9.9.9',
nova_metadata_port=8775,
metadata_proxy_shared_secret='secret',
nova_metadata_protocol='http',
nova_metadata_insecure=True,
nova_client_cert='nova_cert',
nova_client_priv_key='nova_priv_key')
cache.register_oslo_configs(self.conf)
@ -68,7 +58,7 @@ class TestMetadataProxyHandlerBase(base.BaseTestCase):
def setUp(self):
super(TestMetadataProxyHandlerBase, self).setUp()
self.useFixture(self.fake_conf_fixture)
self.log_p = mock.patch.object(agent, 'LOG')
self.log_p = mock.patch.object(proxy_base, 'LOG')
self.log = self.log_p.start()
self.handler = agent.MetadataProxyHandler(self.fake_conf)
self.handler.plugin_rpc = mock.Mock()
@ -124,7 +114,7 @@ class _TestMetadataProxyHandlerCacheMixin(object):
def test_call(self):
req = mock.Mock()
with mock.patch.object(self.handler,
'_get_instance_and_tenant_id') as get_ids:
'_get_instance_and_project_id') as get_ids:
get_ids.return_value = ('instance_id', 'tenant_id')
with mock.patch.object(self.handler, '_proxy_request') as proxy:
proxy.return_value = 'value'
@ -135,7 +125,7 @@ class _TestMetadataProxyHandlerCacheMixin(object):
def test_call_skip_cache(self):
req = mock.Mock()
with mock.patch.object(self.handler,
'_get_instance_and_tenant_id') as get_ids:
'_get_instance_and_project_id') as get_ids:
get_ids.return_value = ('instance_id', 'tenant_id')
with mock.patch.object(self.handler, '_proxy_request') as proxy:
proxy.return_value = webob.exc.HTTPNotFound()
@ -145,7 +135,7 @@ class _TestMetadataProxyHandlerCacheMixin(object):
def test_call_no_instance_match(self):
req = mock.Mock()
with mock.patch.object(self.handler,
'_get_instance_and_tenant_id') as get_ids:
'_get_instance_and_project_id') as get_ids:
get_ids.return_value = None, None
retval = self.handler(req)
self.assertIsInstance(retval, webob.exc.HTTPNotFound)
@ -153,7 +143,7 @@ class _TestMetadataProxyHandlerCacheMixin(object):
def test_call_internal_server_error(self):
req = mock.Mock()
with mock.patch.object(self.handler,
'_get_instance_and_tenant_id') as get_ids:
'_get_instance_and_project_id') as get_ids:
get_ids.side_effect = Exception
retval = self.handler(req)
self.assertIsInstance(retval, webob.exc.HTTPInternalServerError)
@ -213,51 +203,58 @@ class _TestMetadataProxyHandlerCacheMixin(object):
self.assertEqual(
1, self.handler.plugin_rpc.get_ports.call_count)
def test_get_ports_network_id(self):
def test_get_port_network_id(self):
network_id = 'network-id'
router_id = 'router-id'
remote_address = 'remote-address'
expected = ['port1']
expected = ('device1', 'tenant1')
ports = [
{'device_id': 'device1', 'tenant_id': 'tenant1',
'network_id': 'network1'}
]
networks = (network_id,)
with mock.patch.object(self.handler,
'_get_ports_for_remote_address'
'_get_ports_for_remote_address',
return_value=ports
) as mock_get_ip_addr,\
mock.patch.object(self.handler,
'_get_router_networks'
) as mock_get_router_networks:
mock_get_ip_addr.return_value = expected
ports = self.handler._get_ports(remote_address, network_id,
router_id)
port = self.handler.get_port(remote_address, network_id,
router_id=router_id)
mock_get_ip_addr.assert_called_once_with(remote_address,
networks,
remote_mac=None,
skip_cache=False)
self.assertFalse(mock_get_router_networks.called)
self.assertEqual(expected, ports)
self.assertEqual(expected, port)
def test_get_ports_router_id(self):
def test_get_port_router_id(self):
router_id = 'router-id'
remote_address = 'remote-address'
expected = ['port1']
expected = ('device1', 'tenant1')
ports = [
{'device_id': 'device1', 'tenant_id': 'tenant1',
'network_id': 'network1'}
]
networks = ('network1', 'network2')
with mock.patch.object(self.handler,
'_get_ports_for_remote_address',
return_value=expected
return_value=ports
) as mock_get_ip_addr,\
mock.patch.object(self.handler,
'_get_router_networks',
return_value=networks
) as mock_get_router_networks:
ports = self.handler._get_ports(remote_address,
router_id=router_id)
port = self.handler.get_port(remote_address, router_id=router_id)
mock_get_router_networks.assert_called_once_with(
router_id, skip_cache=False)
mock_get_ip_addr.assert_called_once_with(
remote_address, networks, remote_mac=None, skip_cache=False)
self.assertEqual(expected, ports)
self.assertEqual(expected, port)
def test_get_ports_no_id(self):
self.assertRaises(TypeError, self.handler._get_ports, 'remote_address')
def test_get_port_no_id(self):
self.assertRaises(TypeError, self.handler.get_port, 'remote_address')
def _get_instance_and_tenant_id_helper(self, headers, list_ports_retval,
networks=None, router_id=None,
@ -269,7 +266,7 @@ class _TestMetadataProxyHandlerCacheMixin(object):
return list_ports_retval.pop(0)
self.handler.plugin_rpc.get_ports.side_effect = mock_get_ports
instance_id, tenant_id = self.handler._get_instance_and_tenant_id(req)
instance_id, tenant_id = self.handler._get_instance_and_project_id(req)
expected = []
@ -407,93 +404,6 @@ class _TestMetadataProxyHandlerCacheMixin(object):
remote_address=remote_address)
)
def _proxy_request_test_helper(self, response_code=200, method='GET'):
hdrs = {'X-Forwarded-For': '8.8.8.8'}
body = 'body'
req = mock.Mock(path_info='/the_path', query_string='', headers=hdrs,
method=method, body=body)
resp = mock.MagicMock(status_code=response_code)
resp.status.__str__.side_effect = AttributeError
resp.content = 'content'
req.response = resp
with mock.patch.object(utils, 'sign_instance_id') as sign:
sign.return_value = 'signed'
with mock.patch('requests.request') as mock_request:
resp.headers = {'content-type': 'text/plain'}
mock_request.return_value = resp
retval = self.handler._proxy_request('the_id', 'tenant_id',
req)
mock_request.assert_called_once_with(
method=method, url='http://9.9.9.9:8775/the_path',
headers={
'X-Forwarded-For': '8.8.8.8',
'X-Instance-ID-Signature': 'signed',
'X-Instance-ID': 'the_id',
'X-Tenant-ID': 'tenant_id'
},
data=body,
cert=(self.fake_conf.nova_client_cert,
self.fake_conf.nova_client_priv_key),
verify=False,
timeout=60)
return retval
def test_proxy_request_post(self):
response = self._proxy_request_test_helper(method='POST')
self.assertEqual('text/plain', response.content_type)
self.assertEqual('content', response.body)
def test_proxy_request_200(self):
response = self._proxy_request_test_helper(200)
self.assertEqual('text/plain', response.content_type)
self.assertEqual('content', response.body)
def test_proxy_request_400(self):
self.assertIsInstance(self._proxy_request_test_helper(400),
webob.exc.HTTPBadRequest)
def test_proxy_request_403(self):
self.assertIsInstance(self._proxy_request_test_helper(403),
webob.exc.HTTPForbidden)
def test_proxy_request_404(self):
self.assertIsInstance(self._proxy_request_test_helper(404),
webob.exc.HTTPNotFound)
def test_proxy_request_409(self):
self.assertIsInstance(self._proxy_request_test_helper(409),
webob.exc.HTTPConflict)
def test_proxy_request_500(self):
self.assertIsInstance(self._proxy_request_test_helper(500),
webob.exc.HTTPInternalServerError)
def test_proxy_request_502(self):
self.assertIsInstance(self._proxy_request_test_helper(502),
webob.exc.HTTPBadGateway)
def test_proxy_request_503(self):
self.assertIsInstance(self._proxy_request_test_helper(503),
webob.exc.HTTPServiceUnavailable)
def test_proxy_request_504(self):
self.assertIsInstance(self._proxy_request_test_helper(504),
webob.exc.HTTPGatewayTimeout)
def test_proxy_request_other_code(self):
with testtools.ExpectedException(Exception):
self._proxy_request_test_helper(302)
def test_proxy_request_conenction_error(self):
req = mock.Mock(path_info='/the_path', query_string='', headers={},
method='GET', body='')
with mock.patch('requests.request') as mock_request:
mock_request.side_effect = requests.ConnectionError()
retval = self.handler._proxy_request('the_id', 'tenant_id', req)
self.assertIsInstance(retval, webob.exc.HTTPServiceUnavailable)
class TestMetadataProxyHandlerNewCache(TestMetadataProxyHandlerBase,
_TestMetadataProxyHandlerCacheMixin):

View File

@ -0,0 +1,203 @@
# Copyright 2024 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import requests
import testtools
import webob
from oslo_config import cfg
from oslo_config import fixture as config_fixture
from neutron.agent.metadata import proxy_base
from neutron.common import utils
from neutron.conf.agent.metadata import config as meta_conf
from neutron.tests import base
class ConfFixture(config_fixture.Config):
def setUp(self):
super().setUp()
meta_conf.register_meta_conf_opts(
meta_conf.SHARED_OPTS, self.conf)
meta_conf.register_meta_conf_opts(
meta_conf.METADATA_PROXY_HANDLER_OPTS, self.conf)
meta_conf.register_meta_conf_opts(
meta_conf.UNIX_DOMAIN_METADATA_PROXY_OPTS, self.conf)
self.config(auth_ca_cert=None,
nova_metadata_host='9.9.9.9',
nova_metadata_port=8775,
metadata_proxy_shared_secret='secret',
nova_metadata_protocol='http',
nova_metadata_insecure=True,
nova_client_cert='nova_cert',
nova_client_priv_key='nova_priv_key')
class FakeMetadataProxyHandler(proxy_base.MetadataProxyHandlerBase):
def __init__(self, conf):
super().__init__(conf)
def get_port(self, remote_address, network_id=None, remote_mac=None,
router_id=None, skip_cache=False):
# This is an abstractmethod so must be defined
return None, None
class TestMetadataProxyHandlerBase(base.BaseTestCase):
fake_conf = cfg.CONF
fake_conf_fixture = ConfFixture(fake_conf)
def setUp(self):
super().setUp()
self.useFixture(self.fake_conf_fixture)
self.handler = FakeMetadataProxyHandler(self.fake_conf)
def _proxy_request_test_helper(self, response_code=200, method='GET'):
hdrs = {'X-Forwarded-For': '8.8.8.8'}
body = 'body'
req = mock.Mock(path_info='/the_path', query_string='', headers=hdrs,
method=method, body=body)
resp = mock.MagicMock(status_code=response_code)
resp.status.__str__.side_effect = AttributeError
resp.content = 'content'
req.response = resp
with mock.patch.object(utils, 'sign_instance_id') as sign:
sign.return_value = 'signed'
with mock.patch('requests.request') as mock_request:
resp.headers = {'content-type': 'text/plain'}
mock_request.return_value = resp
retval = self.handler._proxy_request('the_id', 'tenant_id',
req)
mock_request.assert_called_once_with(
method=method, url='http://9.9.9.9:8775/the_path',
headers={
'X-Forwarded-For': '8.8.8.8',
'X-Instance-ID-Signature': 'signed',
'X-Instance-ID': 'the_id',
'X-Tenant-ID': 'tenant_id'
},
data=body,
cert=(self.fake_conf.nova_client_cert,
self.fake_conf.nova_client_priv_key),
verify=False,
timeout=60)
return retval
def test_proxy_request_post(self):
response = self._proxy_request_test_helper(method='POST')
self.assertEqual('text/plain', response.content_type)
self.assertEqual('content', response.body)
def test_proxy_request_200(self):
response = self._proxy_request_test_helper(200)
self.assertEqual('text/plain', response.content_type)
self.assertEqual('content', response.body)
def test_proxy_request_400(self):
self.assertIsInstance(self._proxy_request_test_helper(400),
webob.exc.HTTPBadRequest)
def test_proxy_request_403(self):
self.assertIsInstance(self._proxy_request_test_helper(403),
webob.exc.HTTPForbidden)
def test_proxy_request_404(self):
self.assertIsInstance(self._proxy_request_test_helper(404),
webob.exc.HTTPNotFound)
def test_proxy_request_409(self):
self.assertIsInstance(self._proxy_request_test_helper(409),
webob.exc.HTTPConflict)
def test_proxy_request_500(self):
self.assertIsInstance(self._proxy_request_test_helper(500),
webob.exc.HTTPInternalServerError)
def test_proxy_request_502(self):
self.assertIsInstance(self._proxy_request_test_helper(502),
webob.exc.HTTPBadGateway)
def test_proxy_request_503(self):
self.assertIsInstance(self._proxy_request_test_helper(503),
webob.exc.HTTPServiceUnavailable)
def test_proxy_request_504(self):
self.assertIsInstance(self._proxy_request_test_helper(504),
webob.exc.HTTPGatewayTimeout)
def test_proxy_request_other_code(self):
with testtools.ExpectedException(Exception):
self._proxy_request_test_helper(302)
def test_proxy_request_connection_error(self):
req = mock.Mock(path_info='/the_path', query_string='', headers={},
method='GET', body='')
with mock.patch('requests.request') as mock_request:
mock_request.side_effect = requests.ConnectionError()
retval = self.handler._proxy_request('the_id', 'tenant_id', req)
self.assertIsInstance(retval, webob.exc.HTTPServiceUnavailable)
class FakeUnixDomainMetadataProxy(proxy_base.UnixDomainMetadataProxyBase):
def __init__(self, conf):
super().__init__(conf)
def run(self):
# This is an abstractmethod so must be defined
pass
class TestUnixDomainMetadataProxyBase(base.BaseTestCase):
fake_conf = cfg.CONF
fake_conf_fixture = ConfFixture(fake_conf)
def setUp(self):
super().setUp()
self.useFixture(self.fake_conf_fixture)
self.proxy = FakeUnixDomainMetadataProxy(self.fake_conf)
def test__get_socket_mode_user(self):
self.fake_conf_fixture.config(
metadata_proxy_socket_mode=meta_conf.USER_MODE)
mode = self.proxy._get_socket_mode()
self.assertEqual(proxy_base.MODE_MAP[meta_conf.USER_MODE], mode)
def test__get_socket_mode_deduce_user_root(self):
self.fake_conf_fixture.config(
metadata_proxy_socket_mode=meta_conf.DEDUCE_MODE,
metadata_proxy_user='root')
mode = self.proxy._get_socket_mode()
self.assertEqual(proxy_base.MODE_MAP[meta_conf.USER_MODE], mode)
@mock.patch.object(proxy_base.agent_utils, 'is_effective_group')
@mock.patch.object(proxy_base.agent_utils, 'is_effective_user')
def test__get_socket_mode_deduce_group(self, mock_ieu, mock_ieg):
self.fake_conf_fixture.config(
metadata_proxy_socket_mode=meta_conf.DEDUCE_MODE,
metadata_proxy_user='fake',
metadata_proxy_group='fake')
# Always force non-effective user
mock_ieu.return_value = False
# group effective
mock_ieg.return_value = True
mode = self.proxy._get_socket_mode()
self.assertEqual(proxy_base.MODE_MAP[meta_conf.GROUP_MODE], mode)
# group non-effective
mock_ieg.return_value = False
mode = self.proxy._get_socket_mode()
self.assertEqual(proxy_base.MODE_MAP[meta_conf.ALL_MODE], mode)

View File

@ -18,15 +18,13 @@ from unittest import mock
from oslo_config import cfg
from oslo_config import fixture as config_fixture
from oslo_utils import fileutils
import requests
import testtools
import webob
from neutron.agent.linux import utils as agent_utils
from neutron.agent.metadata import proxy_base
from neutron.agent.ovn.metadata import server as agent
from neutron.common import utils as common_utils
from neutron.conf.agent.metadata import config as meta_conf
from neutron.conf.agent.ovn.metadata import config as ovn_meta_conf
from neutron.tests import base
OvnPortInfo = collections.namedtuple(
@ -34,18 +32,7 @@ OvnPortInfo = collections.namedtuple(
class ConfFixture(config_fixture.Config):
def setUp(self):
super(ConfFixture, self).setUp()
ovn_meta_conf.register_meta_conf_opts(
meta_conf.METADATA_PROXY_HANDLER_OPTS, self.conf)
self.config(auth_ca_cert=None,
nova_metadata_host='9.9.9.9',
nova_metadata_port=8775,
metadata_proxy_shared_secret='secret',
nova_metadata_protocol='http',
nova_metadata_insecure=True,
nova_client_cert='nova_cert',
nova_client_priv_key='nova_priv_key')
pass
class TestMetadataProxyHandler(base.BaseTestCase):
@ -55,7 +42,7 @@ class TestMetadataProxyHandler(base.BaseTestCase):
def setUp(self):
super(TestMetadataProxyHandler, self).setUp()
self.useFixture(self.fake_conf_fixture)
self.log_p = mock.patch.object(agent, 'LOG')
self.log_p = mock.patch.object(proxy_base, 'LOG')
self.log = self.log_p.start()
self.handler = agent.MetadataProxyHandler(self.fake_conf, 'chassis1',
mock.Mock())
@ -170,93 +157,6 @@ class TestMetadataProxyHandler(base.BaseTestCase):
ports)
self.assertEqual(expected, observed)
def _proxy_request_test_helper(self, response_code=200, method='GET'):
hdrs = {'X-Forwarded-For': '8.8.8.8'}
body = 'body'
req = mock.Mock(path_info='/the_path', query_string='', headers=hdrs,
method=method, body=body)
resp = mock.MagicMock(status_code=response_code)
resp.status.__str__.side_effect = AttributeError
resp.content = 'content'
req.response = resp
with mock.patch.object(common_utils, 'sign_instance_id') as sign:
sign.return_value = 'signed'
with mock.patch('requests.request') as mock_request:
resp.headers = {'content-type': 'text/plain'}
mock_request.return_value = resp
retval = self.handler._proxy_request('the_id', 'tenant_id',
req)
mock_request.assert_called_once_with(
method=method, url='http://9.9.9.9:8775/the_path',
headers={
'X-Forwarded-For': '8.8.8.8',
'X-Instance-ID-Signature': 'signed',
'X-Instance-ID': 'the_id',
'X-Tenant-ID': 'tenant_id'
},
data=body,
cert=(self.fake_conf.nova_client_cert,
self.fake_conf.nova_client_priv_key),
verify=False,
timeout=60)
return retval
def test_proxy_request_post(self):
response = self._proxy_request_test_helper(method='POST')
self.assertEqual(response.content_type, "text/plain")
self.assertEqual(response.body, 'content')
def test_proxy_request_200(self):
response = self._proxy_request_test_helper(200)
self.assertEqual(response.content_type, "text/plain")
self.assertEqual(response.body, 'content')
def test_proxy_request_400(self):
self.assertIsInstance(self._proxy_request_test_helper(400),
webob.exc.HTTPBadRequest)
def test_proxy_request_403(self):
self.assertIsInstance(self._proxy_request_test_helper(403),
webob.exc.HTTPForbidden)
def test_proxy_request_404(self):
self.assertIsInstance(self._proxy_request_test_helper(404),
webob.exc.HTTPNotFound)
def test_proxy_request_409(self):
self.assertIsInstance(self._proxy_request_test_helper(409),
webob.exc.HTTPConflict)
def test_proxy_request_500(self):
self.assertIsInstance(self._proxy_request_test_helper(500),
webob.exc.HTTPInternalServerError)
def test_proxy_request_502(self):
self.assertIsInstance(self._proxy_request_test_helper(502),
webob.exc.HTTPBadGateway)
def test_proxy_request_503(self):
self.assertIsInstance(self._proxy_request_test_helper(503),
webob.exc.HTTPServiceUnavailable)
def test_proxy_request_504(self):
self.assertIsInstance(self._proxy_request_test_helper(504),
webob.exc.HTTPGatewayTimeout)
def test_proxy_request_other_code(self):
with testtools.ExpectedException(Exception):
self._proxy_request_test_helper(302)
def test_proxy_request_conenction_error(self):
req = mock.Mock(path_info='/the_path', query_string='', headers={},
method='GET', body='')
with mock.patch('requests.request') as mock_request:
mock_request.side_effect = requests.ConnectionError()
retval = self.handler._proxy_request('the_id', 'tenant_id', req)
self.assertIsInstance(retval, webob.exc.HTTPServiceUnavailable)
class TestUnixDomainMetadataProxy(base.BaseTestCase):
def setUp(self):