neutron/neutron/agent/metadata/agent.py

212 lines
8.6 KiB
Python

# Copyright 2012 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.agent import topics
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.utils import host
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from neutron._i18n import _
from neutron.agent.common import base_agent_rpc
from neutron.agent.linux import utils as agent_utils
from neutron.agent.metadata import proxy_base
from neutron.agent import rpc as agent_rpc
from neutron.common import cache_utils as cache
LOG = logging.getLogger(__name__)
class MetadataPluginAPI(base_agent_rpc.BasePluginApi):
"""Agent-side RPC for metadata agent-to-plugin interaction.
This class implements the client side of an rpc interface used by the
metadata service to make calls back into the Neutron plugin. The server
side is defined in
neutron.api.rpc.handlers.metadata_rpc.MetadataRpcCallback. For more
information about changing rpc interfaces, see
doc/source/contributor/internals/rpc_api.rst.
API version history:
1.0 - Initial version.
"""
def __init__(self, topic):
super().__init__(
topic=topic,
namespace=constants.RPC_NAMESPACE_METADATA,
version='1.0')
class MetadataProxyHandler(proxy_base.MetadataProxyHandlerBase):
NETWORK_ID_HEADER = 'X-Neutron-Network-ID'
ROUTER_ID_HEADER = 'X-Neutron-Router-ID'
def __init__(self, conf):
self._cache = cache.get_cache(conf)
super().__init__(conf, has_cache=True)
self.plugin_rpc = MetadataPluginAPI(topics.PLUGIN)
self.context = context.get_admin_context_without_session()
def _get_ports_from_server(self, router_id=None, ip_address=None,
networks=None, mac_address=None):
"""Get ports from server."""
filters = self._get_port_filters(
router_id, ip_address, networks, mac_address)
return self.plugin_rpc.get_ports(self.context, filters)
def _get_port_filters(self, router_id=None, ip_address=None,
networks=None, mac_address=None):
filters = {}
if router_id:
filters['device_id'] = [router_id]
filters['device_owner'] = constants.ROUTER_INTERFACE_OWNERS
# We either get an IP assigned (and therefore known) by neutron
# via X-Forwarded-For or that header contained a link-local
# IPv6 address of which neutron only knows the MAC address encoded
# in it. In the latter case the IPv6 address in X-Forwarded-For
# is not a fixed ip of the port.
if mac_address:
filters['mac_address'] = [mac_address]
elif ip_address:
filters['fixed_ips'] = {'ip_address': [ip_address]}
if networks:
filters['network_id'] = networks
return filters
@cache.cache_method_results
def _get_router_networks(self, router_id, skip_cache=False):
"""Find all networks connected to given router."""
internal_ports = self._get_ports_from_server(router_id=router_id)
return tuple(p['network_id'] for p in internal_ports)
@cache.cache_method_results
def _get_ports_for_remote_address(self, remote_address, networks,
remote_mac=None,
skip_cache=False):
"""Get list of ports that has given IP address and are part of
given networks.
:param remote_address: IP address to search for
:param networks: List of networks in which the IP address will be
searched for
:param remote_mac: Remote MAC to filter by, if given
:param skip_cache: When to skip getting entry from cache
"""
return self._get_ports_from_server(networks=networks,
ip_address=remote_address,
mac_address=remote_mac)
def get_port(self, remote_address, network_id=None, remote_mac=None,
router_id=None, skip_cache=False):
if network_id:
networks = (network_id,)
elif router_id:
networks = self._get_router_networks(router_id,
skip_cache=skip_cache)
else:
raise TypeError(_("Either one of parameter network_id or router_id"
" must be passed to get_port method."))
ports = self._get_ports_for_remote_address(remote_address, networks,
remote_mac=remote_mac,
skip_cache=skip_cache)
LOG.debug("Got ports for remote_address %(remote_address)s, "
"network_id %(network_id)s, remote_mac %(remote_mac)s, "
"router_id %(router_id)s"
"%(ports)s",
{"remote_address": remote_address,
"network_id": network_id,
"remote_mac": remote_mac,
"router_id": router_id,
"ports": ports})
num_ports = len(ports)
if num_ports == 1:
return ports[0]['device_id'], ports[0]['tenant_id']
elif num_ports == 0:
LOG.error("No port found in network %s with IP address %s",
network_id, remote_address)
return None, None
class UnixDomainMetadataProxy(proxy_base.UnixDomainMetadataProxyBase):
def __init__(self, conf):
super().__init__(conf)
agent_utils.ensure_directory_exists_without_file(
cfg.CONF.metadata_proxy_socket)
def _init_state_reporting(self):
self.context = context.get_admin_context_without_session()
self.failed_state_report = False
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
self.agent_state = {
'binary': constants.AGENT_PROCESS_METADATA,
'host': cfg.CONF.host,
'topic': 'N/A',
'configurations': {
'metadata_proxy_socket': cfg.CONF.metadata_proxy_socket,
'nova_metadata_host': cfg.CONF.nova_metadata_host,
'nova_metadata_port': cfg.CONF.nova_metadata_port,
'log_agent_heartbeats': cfg.CONF.AGENT.log_agent_heartbeats,
},
'start_flag': True,
'agent_type': constants.AGENT_TYPE_METADATA}
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
self.heartbeat.start(interval=report_interval)
def _report_state(self):
try:
self.state_rpc.report_state(
self.context,
self.agent_state,
use_call=self.agent_state.get('start_flag'))
except AttributeError:
# This means the server does not support report_state
LOG.warning('Neutron server does not support state report.'
' State report for this agent will be disabled.')
self.heartbeat.stop()
return
except Exception:
self.failed_state_report = True
LOG.exception("Failed reporting state!")
return
if self.failed_state_report:
self.failed_state_report = False
LOG.info('Successfully reported state after a previous failure.')
self.agent_state.pop('start_flag', None)
def run(self):
server = agent_utils.UnixDomainWSGIServer(
constants.AGENT_PROCESS_METADATA)
# Set the default metadata_workers if not yet set in the config file
md_workers = self.conf.metadata_workers
if md_workers is None:
md_workers = host.cpu_count() // 2
server.start(MetadataProxyHandler(self.conf),
self.conf.metadata_proxy_socket,
workers=md_workers,
backlog=self.conf.metadata_backlog,
mode=self._get_socket_mode())
self._init_state_reporting()
server.wait()