RPC Callback rolling upgrades reporting, and integration
This is the second patch to allow upgrades on RPC versioned objects callbacks. This enables resource version notifications from agents to all neutron servers via fanout for updating the version sets in memory, and via agent status updates for DB storage, so any neutron server can retrieve such information at boot. Closes-Bug: #1535247 Change-Id: I67c1323267aaf7e49f4a359ff50b94e52dba4380
This commit is contained in:
parent
b9ef98b4f7
commit
97a272a892
@ -164,10 +164,9 @@ Cached subset will be re-evaluated (to cut down the version sets as agents
|
||||
upgrade) after configured TTL.
|
||||
|
||||
As a fast path to update this cache on all neutron-servers when upgraded agents
|
||||
come up (or old agents revive after a long timeout or even a downgrade), we could
|
||||
introduce a fanout queue consumed by servers, to additionally notify from one
|
||||
agent to all neutron-servers about the "versions of interest" in the agent just
|
||||
comming up.
|
||||
come up (or old agents revive after a long timeout or even a downgrade) the
|
||||
server registering the new status update will notify the other servers about
|
||||
the new consumer resource versions via cast.
|
||||
|
||||
All notifications for all calculated version sets must be sent, as non-upgraded
|
||||
agents would otherwise not receive them.
|
||||
|
@ -12,16 +12,41 @@
|
||||
|
||||
import collections
|
||||
import copy
|
||||
import pprint
|
||||
import time
|
||||
|
||||
from neutron_lib import constants
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import importutils
|
||||
|
||||
from neutron.api.rpc.callbacks import resources
|
||||
from neutron.api.rpc.callbacks import exceptions
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
VERSIONS_TTL = 60
|
||||
|
||||
# This is the list of agents that started using this rpc push/pull mechanism
|
||||
# for versioned objects, but at that time stable/liberty, they were not
|
||||
# reporting versions, so we need to assume they need QosPolicy 1.0
|
||||
#TODO(mangelajo): Remove this logic in Newton, since those agents will be
|
||||
# already reporting From N to O
|
||||
NON_REPORTING_AGENT_TYPES = [constants.AGENT_TYPE_OVS,
|
||||
constants.AGENT_TYPE_NIC_SWITCH]
|
||||
|
||||
|
||||
# NOTE(mangelajo): if we import this globally we end up with a (very
|
||||
# long) circular dependency, this can be fixed if we
|
||||
# stop importing all exposed classes in
|
||||
# neutron.api.rpc.callbacks.resources and provide
|
||||
# a decorator to expose classes
|
||||
def _import_resources():
|
||||
return importutils.import_module('neutron.api.rpc.callbacks.resources')
|
||||
|
||||
|
||||
AgentConsumer = collections.namedtuple('AgentConsumer', ['agent_type',
|
||||
'host'])
|
||||
AgentConsumer.__repr__ = lambda self: '%s@%s' % self
|
||||
|
||||
|
||||
class ResourceConsumerTracker(object):
|
||||
"""Class to be provided back by consumer_versions_callback.
|
||||
@ -59,6 +84,7 @@ class ResourceConsumerTracker(object):
|
||||
self._needs_recalculation = False
|
||||
|
||||
def _get_local_resource_versions(self):
|
||||
resources = _import_resources()
|
||||
local_resource_versions = collections.defaultdict(set)
|
||||
for resource_type, version in (
|
||||
resources.LOCAL_RESOURCE_VERSIONS.items()):
|
||||
@ -67,13 +93,17 @@ class ResourceConsumerTracker(object):
|
||||
|
||||
# TODO(mangelajo): add locking with _recalculate_versions if we ever
|
||||
# move out of green threads.
|
||||
def _set_version(self, consumer_id, resource_type, version):
|
||||
def _set_version(self, consumer, resource_type, version):
|
||||
"""Set or update a consumer resource type version."""
|
||||
self._versions[resource_type].add(version)
|
||||
prev_version = (
|
||||
self._versions_by_consumer[consumer_id].get(resource_type, None))
|
||||
self._versions_by_consumer[consumer_id][resource_type] = version
|
||||
if prev_version and (prev_version != version):
|
||||
consumer_versions = self._versions_by_consumer[consumer]
|
||||
prev_version = consumer_versions.get(resource_type, None)
|
||||
if version:
|
||||
consumer_versions[resource_type] = version
|
||||
else:
|
||||
consumer_versions.pop(resource_type, None)
|
||||
|
||||
if prev_version != version:
|
||||
# If a version got updated/changed in a consumer, we need to
|
||||
# recalculate the main dictionary of versions based on the
|
||||
# new _versions_by_consumer.
|
||||
@ -82,18 +112,46 @@ class ResourceConsumerTracker(object):
|
||||
self._needs_recalculation = True
|
||||
LOG.debug("Version for resource type %(resource_type)s changed "
|
||||
"%(prev_version)s to %(version)s on "
|
||||
"consumer %(consumer_id)s",
|
||||
"consumer %(consumer)s",
|
||||
{'resource_type': resource_type,
|
||||
'version': version,
|
||||
'prev_version': prev_version,
|
||||
'consumer_id': consumer_id})
|
||||
'consumer': consumer})
|
||||
|
||||
def set_versions(self, consumer_id, versions):
|
||||
def set_versions(self, consumer, versions):
|
||||
"""Set or update an specific consumer resource types."""
|
||||
for resource_type, resource_version in versions.items():
|
||||
self._set_version(consumer_id, resource_type,
|
||||
self._set_version(consumer, resource_type,
|
||||
resource_version)
|
||||
|
||||
if versions:
|
||||
self._cleanup_removed_versions(consumer, versions)
|
||||
else:
|
||||
self._handle_no_set_versions(consumer)
|
||||
|
||||
def _cleanup_removed_versions(self, consumer, versions):
|
||||
"""Check if any version report has been removed, and cleanup."""
|
||||
prev_resource_types = set(
|
||||
self._versions_by_consumer[consumer].keys())
|
||||
cur_resource_types = set(versions.keys())
|
||||
removed_resource_types = prev_resource_types - cur_resource_types
|
||||
for resource_type in removed_resource_types:
|
||||
self._set_version(consumer, resource_type, None)
|
||||
|
||||
def _handle_no_set_versions(self, consumer):
|
||||
"""Handle consumers reporting no versions."""
|
||||
if isinstance(consumer, AgentConsumer):
|
||||
if consumer.agent_type in NON_REPORTING_AGENT_TYPES:
|
||||
resources = _import_resources()
|
||||
self._versions_by_consumer[consumer] = {
|
||||
resources.QOS_POLICY: '1.0'}
|
||||
self._versions[resources.QOS_POLICY].add('1.0')
|
||||
return
|
||||
|
||||
if self._versions_by_consumer[consumer]:
|
||||
self._needs_recalculation = True
|
||||
self._versions_by_consumer[consumer] = {}
|
||||
|
||||
def get_resource_versions(self, resource_type):
|
||||
"""Fetch the versions necessary to notify all consumers."""
|
||||
if self._needs_recalculation:
|
||||
@ -102,6 +160,18 @@ class ResourceConsumerTracker(object):
|
||||
|
||||
return copy.copy(self._versions[resource_type])
|
||||
|
||||
def report(self):
|
||||
"""Output debug information about the consumer versions."""
|
||||
#TODO(mangelajo): report only when pushed_versions differ from
|
||||
# previous reports.
|
||||
format = lambda versions: pprint.pformat(dict(versions), indent=4)
|
||||
debug_dict = {'pushed_versions': format(self._versions),
|
||||
'consumer_versions': format(self._versions_by_consumer)}
|
||||
|
||||
LOG.debug('Tracked resource versions report:\n'
|
||||
'pushed versions:\n%(pushed_versions)s\n\n'
|
||||
'consumer versions:\n%(consumer_versions)s\n', debug_dict)
|
||||
|
||||
# TODO(mangelajo): Add locking if we ever move out of greenthreads.
|
||||
def _recalculate_versions(self):
|
||||
"""Recalculate the _versions set.
|
||||
@ -130,8 +200,9 @@ class CachedResourceConsumerTracker(object):
|
||||
new_tracker = ResourceConsumerTracker()
|
||||
self._consumer_versions_callback(new_tracker)
|
||||
self._versions = new_tracker
|
||||
self._versions.report()
|
||||
else:
|
||||
pass # TODO(mangelajo): throw exception if callback not provided
|
||||
raise exceptions.VersionsCallbackNotFound()
|
||||
|
||||
def _check_expiration(self):
|
||||
if time.time() > self._expires_at:
|
||||
@ -145,11 +216,22 @@ class CachedResourceConsumerTracker(object):
|
||||
self._check_expiration()
|
||||
return self._versions.get_resource_versions(resource_type)
|
||||
|
||||
def update_versions(self, consumer_id, resource_versions):
|
||||
self._versions.set_versions(consumer_id, resource_versions)
|
||||
def update_versions(self, consumer, resource_versions):
|
||||
self._versions.set_versions(consumer, resource_versions)
|
||||
|
||||
def report(self):
|
||||
self._check_expiration()
|
||||
self._versions.report()
|
||||
|
||||
_cached_version_tracker = None
|
||||
|
||||
|
||||
cached_version_tracker = CachedResourceConsumerTracker()
|
||||
#NOTE(ajo): add locking if we ever stop using greenthreads
|
||||
def _get_cached_tracker():
|
||||
global _cached_version_tracker
|
||||
if not _cached_version_tracker:
|
||||
_cached_version_tracker = CachedResourceConsumerTracker()
|
||||
return _cached_version_tracker
|
||||
|
||||
|
||||
def set_consumer_versions_callback(callback):
|
||||
@ -160,30 +242,23 @@ def set_consumer_versions_callback(callback):
|
||||
|
||||
The callback will receive a ResourceConsumerTracker object,
|
||||
and the ResourceConsumerTracker methods must be used to provide
|
||||
each consumer_id versions. Consumer ids can be obtained from this
|
||||
each consumer versions. Consumer ids can be obtained from this
|
||||
module via the next functions:
|
||||
* get_agent_consumer_id
|
||||
"""
|
||||
cached_version_tracker.set_consumer_versions_callback(callback)
|
||||
_get_cached_tracker().set_consumer_versions_callback(callback)
|
||||
|
||||
|
||||
def get_resource_versions(resource_type):
|
||||
"""Return the set of versions expected by the consumers of a resource."""
|
||||
return cached_version_tracker.get_resource_versions(resource_type)
|
||||
return _get_cached_tracker().get_resource_versions(resource_type)
|
||||
|
||||
|
||||
def update_versions(consumer_id, resource_versions):
|
||||
def update_versions(consumer, resource_versions):
|
||||
"""Update the resources' versions for a consumer id."""
|
||||
cached_version_tracker.set_versions(consumer_id, resource_versions)
|
||||
_get_cached_tracker().update_versions(consumer, resource_versions)
|
||||
|
||||
|
||||
def get_agent_consumer_id(agent_type, agent_host):
|
||||
"""Return a consumer id string for an agent type + host tuple.
|
||||
|
||||
The logic behind this function, is that, eventually we could have
|
||||
consumers of RPC callbacks which are not agents, thus we want
|
||||
to totally collate all the different consumer types and provide
|
||||
unique consumer ids.
|
||||
"""
|
||||
return "%(agent_type)s@%(agent_host)s" % {'agent_type': agent_type,
|
||||
'agent_host': agent_host}
|
||||
def report():
|
||||
"""Report resource versions in debug logs."""
|
||||
_get_cached_tracker().report()
|
||||
|
@ -120,6 +120,53 @@ class ResourcesPullRpcCallback(object):
|
||||
return obj.obj_to_primitive(target_version=version)
|
||||
|
||||
|
||||
class ResourcesPushToServersRpcApi(object):
|
||||
"""Publisher-side RPC (stub) for plugin-to-plugin fanout interaction.
|
||||
|
||||
This class implements the client side of an rpc interface. The receiver
|
||||
side can be found below: ResourcesPushToServerRpcCallback. For more
|
||||
information on this RPC interface, see doc/source/devref/rpc_callbacks.rst.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
target = oslo_messaging.Target(
|
||||
topic=topics.SERVER_RESOURCE_VERSIONS, version='1.0',
|
||||
namespace=constants.RPC_NAMESPACE_RESOURCES)
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def report_agent_resource_versions(self, context, agent_type, agent_host,
|
||||
version_map):
|
||||
"""Fan out all the agent resource versions to other servers."""
|
||||
cctxt = self.client.prepare(fanout=True)
|
||||
cctxt.cast(context, 'report_agent_resource_versions',
|
||||
agent_type=agent_type,
|
||||
agent_host=agent_host,
|
||||
version_map=version_map)
|
||||
|
||||
|
||||
class ResourcesPushToServerRpcCallback(object):
|
||||
"""Receiver-side RPC (implementation) for plugin-to-plugin interaction.
|
||||
|
||||
This class implements the receiver side of an rpc interface.
|
||||
The client side can be found above: ResourcePushToServerRpcApi. For more
|
||||
information on this RPC interface, see doc/source/devref/rpc_callbacks.rst.
|
||||
"""
|
||||
|
||||
# History
|
||||
# 1.0 Initial version
|
||||
|
||||
target = oslo_messaging.Target(
|
||||
version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES)
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def report_agent_resource_versions(self, context, agent_type, agent_host,
|
||||
version_map):
|
||||
consumer_id = version_manager.AgentConsumer(agent_type=agent_type,
|
||||
host=agent_host)
|
||||
version_manager.update_versions(consumer_id, version_map)
|
||||
|
||||
|
||||
class ResourcesPushRpcApi(object):
|
||||
"""Plugin-side RPC for plugin-to-agents interaction.
|
||||
|
||||
|
@ -27,6 +27,7 @@ UPDATE = 'update'
|
||||
|
||||
AGENT = 'q-agent-notifier'
|
||||
PLUGIN = 'q-plugin'
|
||||
SERVER_RESOURCE_VERSIONS = 'q-server-resource-versions'
|
||||
L3PLUGIN = 'q-l3-plugin'
|
||||
REPORTS = 'q-reports-plugin'
|
||||
DHCP = 'q-dhcp-notifer'
|
||||
|
@ -21,6 +21,7 @@ from oslo_db import exception as db_exc
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import importutils
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
import sqlalchemy as sa
|
||||
@ -28,6 +29,7 @@ from sqlalchemy.orm import exc
|
||||
from sqlalchemy import sql
|
||||
|
||||
from neutron._i18n import _, _LE, _LI, _LW
|
||||
from neutron.api.rpc.callbacks import version_manager
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.common import constants
|
||||
from neutron import context
|
||||
@ -68,6 +70,11 @@ AGENT_OPTS = [
|
||||
]
|
||||
cfg.CONF.register_opts(AGENT_OPTS)
|
||||
|
||||
# this is the ratio from agent_down_time to the time we use to consider
|
||||
# the agents down for considering their resource versions in the
|
||||
# version_manager callback
|
||||
DOWNTIME_VERSIONS_RATIO = 2
|
||||
|
||||
|
||||
class Agent(model_base.BASEV2, model_base.HasId):
|
||||
"""Represents agents running in neutron deployments."""
|
||||
@ -98,6 +105,10 @@ class Agent(model_base.BASEV2, model_base.HasId):
|
||||
description = sa.Column(sa.String(attributes.DESCRIPTION_MAX_LEN))
|
||||
# configurations: a json dict string, I think 4095 is enough
|
||||
configurations = sa.Column(sa.String(4095), nullable=False)
|
||||
# resource_versions: json dict, 8191 allows for ~256 resource versions
|
||||
# assuming ~32byte length "'name': 'ver',"
|
||||
# the whole row limit is 65535 bytes in mysql
|
||||
resource_versions = sa.Column(sa.String(8191))
|
||||
# load - number of resources hosted by the agent
|
||||
load = sa.Column(sa.Integer, server_default='0', nullable=False)
|
||||
|
||||
@ -162,6 +173,11 @@ class AgentAvailabilityZoneMixin(az_ext.AvailabilityZonePluginBase):
|
||||
class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||
"""Mixin class to add agent extension to db_base_plugin_v2."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
version_manager.set_consumer_versions_callback(
|
||||
self._get_agents_resource_versions)
|
||||
super(AgentDbMixin, self).__init__(*args, **kwargs)
|
||||
|
||||
def _get_agent(self, context, id):
|
||||
try:
|
||||
agent = self._get_by_id(context, Agent, id)
|
||||
@ -186,18 +202,28 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||
{'agent_type': agent_type, 'agent_id': agent.id})
|
||||
return agent
|
||||
|
||||
@classmethod
|
||||
def is_agent_down(cls, heart_beat_time):
|
||||
@staticmethod
|
||||
def is_agent_down(heart_beat_time):
|
||||
return timeutils.is_older_than(heart_beat_time,
|
||||
cfg.CONF.agent_down_time)
|
||||
|
||||
@staticmethod
|
||||
def is_agent_considered_for_versions(agent_dict):
|
||||
return not timeutils.is_older_than(agent_dict['heartbeat_timestamp'],
|
||||
cfg.CONF.agent_down_time *
|
||||
DOWNTIME_VERSIONS_RATIO)
|
||||
|
||||
def get_configuration_dict(self, agent_db):
|
||||
return self._get_dict(agent_db, 'configurations')
|
||||
|
||||
def _get_dict(self, agent_db, dict_name):
|
||||
try:
|
||||
conf = jsonutils.loads(agent_db.configurations)
|
||||
conf = jsonutils.loads(getattr(agent_db, dict_name))
|
||||
except Exception:
|
||||
msg = _LW('Configuration for agent %(agent_type)s on host %(host)s'
|
||||
' is invalid.')
|
||||
LOG.warn(msg, {'agent_type': agent_db.agent_type,
|
||||
msg = _LW('Dictionary %(dict_name)s for agent %(agent_type)s on '
|
||||
'host %(host)s is invalid.')
|
||||
LOG.warn(msg, {'dict_name': dict_name,
|
||||
'agent_type': agent_db.agent_type,
|
||||
'host': agent_db.host})
|
||||
conf = {}
|
||||
return conf
|
||||
@ -217,9 +243,9 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||
ext_agent.RESOURCE_NAME + 's')
|
||||
res = dict((k, agent[k]) for k in attr
|
||||
if k not in ['alive', 'configurations'])
|
||||
res['alive'] = not AgentDbMixin.is_agent_down(
|
||||
res['heartbeat_timestamp'])
|
||||
res['configurations'] = self.get_configuration_dict(agent)
|
||||
res['alive'] = not self.is_agent_down(res['heartbeat_timestamp'])
|
||||
res['configurations'] = self._get_dict(agent, 'configurations')
|
||||
res['resource_versions'] = self._get_dict(agent, 'resource_versions')
|
||||
res['availability_zone'] = agent['availability_zone']
|
||||
return self._fields(res, fields)
|
||||
|
||||
@ -245,7 +271,6 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||
filters=filters, fields=fields)
|
||||
alive = filters and filters.get('alive', None)
|
||||
if alive:
|
||||
# alive filter will be a list
|
||||
alive = attributes.convert_to_boolean(alive[0])
|
||||
agents = [agent for agent in agents if agent['alive'] == alive]
|
||||
return agents
|
||||
@ -311,6 +336,8 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||
res['availability_zone'] = agent_state['availability_zone']
|
||||
configurations_dict = agent_state.get('configurations', {})
|
||||
res['configurations'] = jsonutils.dumps(configurations_dict)
|
||||
resource_versions_dict = agent_state.get('resource_versions', {})
|
||||
res['resource_versions'] = jsonutils.dumps(resource_versions_dict)
|
||||
res['load'] = self._get_agent_load(agent_state)
|
||||
current_time = timeutils.utcnow()
|
||||
try:
|
||||
@ -340,7 +367,6 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||
|
||||
def create_or_update_agent(self, context, agent):
|
||||
"""Create or update agent according to report."""
|
||||
|
||||
try:
|
||||
return self._create_or_update_agent(context, agent)
|
||||
except db_exc.DBDuplicateEntry:
|
||||
@ -358,6 +384,29 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||
# agent entry, which will be updated multiple times
|
||||
return self._create_or_update_agent(context, agent)
|
||||
|
||||
def _get_agents_considered_for_versions(self):
|
||||
up_agents = self.get_agents(context.get_admin_context(),
|
||||
filters={'admin_state_up': [True]})
|
||||
return filter(self.is_agent_considered_for_versions, up_agents)
|
||||
|
||||
def _get_agents_resource_versions(self, tracker):
|
||||
"""Get the known agent resource versions and update the tracker.
|
||||
|
||||
Receives a version_manager.ResourceConsumerTracker instance and it's
|
||||
expected to look up in to the database and update every agent resource
|
||||
versions.
|
||||
This method is called from version_manager when the cached information
|
||||
has passed TTL.
|
||||
"""
|
||||
for agent in self._get_agents_considered_for_versions():
|
||||
resource_versions = agent.get('resource_versions', {})
|
||||
consumer = version_manager.AgentConsumer(
|
||||
agent_type=agent['agent_type'], host=agent['host'])
|
||||
LOG.debug("Update consumer %(consumer)s versions to: "
|
||||
"%(versions)s", {'consumer': consumer,
|
||||
'versions': resource_versions})
|
||||
tracker.set_versions(consumer, resource_versions)
|
||||
|
||||
|
||||
class AgentExtRpcCallback(object):
|
||||
"""Processes the rpc report in plugin implementations.
|
||||
@ -378,6 +427,12 @@ class AgentExtRpcCallback(object):
|
||||
def __init__(self, plugin=None):
|
||||
super(AgentExtRpcCallback, self).__init__()
|
||||
self.plugin = plugin
|
||||
#TODO(ajo): fix the resources circular dependency issue by dynamically
|
||||
# registering object types in the RPC callbacks api
|
||||
resources_rpc = importutils.import_module(
|
||||
'neutron.api.rpc.handlers.resources_rpc')
|
||||
# Initialize RPC api directed to other neutron-servers
|
||||
self.server_versions_rpc = resources_rpc.ResourcesPushToServersRpcApi()
|
||||
|
||||
def report_state(self, context, **kwargs):
|
||||
"""Report state from agent to server.
|
||||
@ -398,7 +453,20 @@ class AgentExtRpcCallback(object):
|
||||
return
|
||||
if not self.plugin:
|
||||
self.plugin = manager.NeutronManager.get_plugin()
|
||||
return self.plugin.create_or_update_agent(context, agent_state)
|
||||
agent_status = self.plugin.create_or_update_agent(context, agent_state)
|
||||
self._update_local_agent_resource_versions(context, agent_state)
|
||||
return agent_status
|
||||
|
||||
def _update_local_agent_resource_versions(self, context, agent_state):
|
||||
resource_versions_dict = agent_state.get('resource_versions', {})
|
||||
version_manager.update_versions(
|
||||
version_manager.AgentConsumer(agent_type=agent_state['agent_type'],
|
||||
host=agent_state['host']),
|
||||
resource_versions_dict)
|
||||
# report other neutron-servers about this quickly
|
||||
self.server_versions_rpc.report_agent_resource_versions(
|
||||
context, agent_state['agent_type'], agent_state['host'],
|
||||
resource_versions_dict)
|
||||
|
||||
def _check_clock_sync_on_agent_start(self, agent_state, agent_time):
|
||||
"""Checks if the server and the agent times are in sync.
|
||||
|
@ -1 +1 @@
|
||||
15e43b934f81
|
||||
31ed664953e6
|
||||
|
@ -0,0 +1,34 @@
|
||||
# Copyright 2016 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
"""Add resource_versions row to agent table
|
||||
|
||||
Revision ID: 31ed664953e6
|
||||
Revises: c3a73f615e4
|
||||
Create Date: 2016-01-15 13:41:30.016915
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '31ed664953e6'
|
||||
down_revision = '15e43b934f81'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.add_column('agents',
|
||||
sa.Column('resource_versions', sa.String(length=8191)))
|
@ -27,6 +27,7 @@ from neutron._i18n import _LE, _LI
|
||||
from neutron.agent.l2.extensions import manager as ext_manager
|
||||
from neutron.agent import rpc as agent_rpc
|
||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.api.rpc.callbacks import resources
|
||||
from neutron.common import config as common_config
|
||||
from neutron.common import constants
|
||||
from neutron.common import topics
|
||||
@ -84,12 +85,14 @@ class CommonAgentLoop(service.Service):
|
||||
configurations = {'extensions': self.ext_manager.names()}
|
||||
configurations.update(self.mgr.get_agent_configurations())
|
||||
|
||||
#TODO(mangelajo): optimize resource_versions (see ovs agent)
|
||||
self.agent_state = {
|
||||
'binary': self.agent_binary,
|
||||
'host': cfg.CONF.host,
|
||||
'topic': constants.L2_AGENT_TOPIC,
|
||||
'configurations': configurations,
|
||||
'agent_type': self.agent_type,
|
||||
'resource_versions': resources.LOCAL_RESOURCE_VERSIONS,
|
||||
'start_flag': True}
|
||||
|
||||
report_interval = cfg.CONF.AGENT.report_interval
|
||||
|
@ -29,6 +29,7 @@ from neutron._i18n import _, _LE, _LI, _LW
|
||||
from neutron.agent.l2.extensions import manager as ext_manager
|
||||
from neutron.agent import rpc as agent_rpc
|
||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.api.rpc.callbacks import resources
|
||||
from neutron.common import config as common_config
|
||||
from neutron.common import constants as n_constants
|
||||
from neutron.common import topics
|
||||
@ -125,12 +126,15 @@ class SriovNicSwitchAgent(object):
|
||||
|
||||
configurations = {'device_mappings': physical_devices_mappings,
|
||||
'extensions': self.ext_manager.names()}
|
||||
|
||||
#TODO(mangelajo): optimize resource_versions (see ovs agent)
|
||||
self.agent_state = {
|
||||
'binary': 'neutron-sriov-nic-agent',
|
||||
'host': self.conf.host,
|
||||
'topic': n_constants.L2_AGENT_TOPIC,
|
||||
'configurations': configurations,
|
||||
'agent_type': n_constants.AGENT_TYPE_NIC_SWITCH,
|
||||
'resource_versions': resources.LOCAL_RESOURCE_VERSIONS,
|
||||
'start_flag': True}
|
||||
|
||||
# The initialization is complete; we can start receiving messages
|
||||
|
@ -37,6 +37,7 @@ from neutron.agent.linux import ip_lib
|
||||
from neutron.agent.linux import polling as linux_polling
|
||||
from neutron.agent import rpc as agent_rpc
|
||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.api.rpc.callbacks import resources
|
||||
from neutron.api.rpc.handlers import dvr_rpc
|
||||
from neutron.common import config
|
||||
from neutron.common import constants as n_const
|
||||
@ -231,6 +232,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
self.enable_tunneling,
|
||||
self.enable_distributed_routing)
|
||||
|
||||
#TODO(mangelajo): optimize resource_versions to only report
|
||||
# versions about resources which are common,
|
||||
# or which are used by specific extensions.
|
||||
self.agent_state = {
|
||||
'binary': 'neutron-openvswitch-agent',
|
||||
'host': host,
|
||||
@ -250,6 +254,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
'ovs_capabilities': self.ovs.capabilities,
|
||||
'vhostuser_socket_dir':
|
||||
ovs_conf.vhostuser_socket_dir},
|
||||
'resource_versions': resources.LOCAL_RESOURCE_VERSIONS,
|
||||
'agent_type': agent_conf.agent_type,
|
||||
'start_flag': True}
|
||||
|
||||
|
@ -211,6 +211,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.topic = topics.PLUGIN
|
||||
self.conn = n_rpc.create_connection()
|
||||
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
|
||||
self.conn.create_consumer(
|
||||
topics.SERVER_RESOURCE_VERSIONS,
|
||||
[resources_rpc.ResourcesPushToServerRpcCallback()],
|
||||
fanout=True)
|
||||
# process state reports despite dedicated rpc workers
|
||||
self.conn.create_consumer(topics.REPORTS,
|
||||
[agents_db.AgentExtRpcCallback()],
|
||||
|
@ -10,16 +10,18 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import mock
|
||||
|
||||
from neutron.api.rpc.callbacks import exceptions
|
||||
from neutron.api.rpc.callbacks import resources
|
||||
from neutron.api.rpc.callbacks import version_manager
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
TEST_RESOURCE_TYPE = 'TestResourceType'
|
||||
TEST_RESOURCE_VERSION_A = '1.11'
|
||||
TEST_RESOURCE_VERSION_B = '1.12'
|
||||
TEST_VERSION_A = '1.11'
|
||||
TEST_VERSION_B = '1.12'
|
||||
|
||||
TEST_RESOURCE_TYPE_2 = 'AnotherResource'
|
||||
|
||||
@ -27,44 +29,85 @@ AGENT_HOST_1 = 'host-1'
|
||||
AGENT_HOST_2 = 'host-2'
|
||||
AGENT_TYPE_1 = 'dhcp-agent'
|
||||
AGENT_TYPE_2 = 'openvswitch-agent'
|
||||
CONSUMER_1 = version_manager.get_agent_consumer_id(AGENT_HOST_1, AGENT_TYPE_1)
|
||||
CONSUMER_2 = version_manager.get_agent_consumer_id(AGENT_HOST_2, AGENT_TYPE_2)
|
||||
CONSUMER_1 = version_manager.AgentConsumer(AGENT_TYPE_1, AGENT_HOST_1)
|
||||
CONSUMER_2 = version_manager.AgentConsumer(AGENT_TYPE_2, AGENT_HOST_2)
|
||||
|
||||
|
||||
class ResourceConsumerTrackerTest(base.BaseTestCase):
|
||||
|
||||
def test_consumer_set_versions(self):
|
||||
cv = version_manager.ResourceConsumerTracker()
|
||||
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE:
|
||||
TEST_RESOURCE_VERSION_A})
|
||||
self.assertIn(TEST_RESOURCE_VERSION_A,
|
||||
|
||||
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_A})
|
||||
self.assertIn(TEST_VERSION_A,
|
||||
cv.get_resource_versions(TEST_RESOURCE_TYPE))
|
||||
|
||||
def test_consumer_updates_version(self):
|
||||
cv = version_manager.ResourceConsumerTracker()
|
||||
for version in [TEST_RESOURCE_VERSION_A, TEST_RESOURCE_VERSION_B]:
|
||||
|
||||
for version in [TEST_VERSION_A, TEST_VERSION_B]:
|
||||
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: version})
|
||||
|
||||
self.assertEqual(set([TEST_RESOURCE_VERSION_B]),
|
||||
self.assertEqual(set([TEST_VERSION_B]),
|
||||
cv.get_resource_versions(TEST_RESOURCE_TYPE))
|
||||
|
||||
def test_multiple_consumer_version_update(self):
|
||||
cv = version_manager.ResourceConsumerTracker()
|
||||
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE:
|
||||
TEST_RESOURCE_VERSION_A})
|
||||
cv.set_versions(CONSUMER_2, {TEST_RESOURCE_TYPE:
|
||||
TEST_RESOURCE_VERSION_A})
|
||||
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE:
|
||||
TEST_RESOURCE_VERSION_B})
|
||||
|
||||
self.assertEqual(set([TEST_RESOURCE_VERSION_A,
|
||||
TEST_RESOURCE_VERSION_B]),
|
||||
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_A})
|
||||
cv.set_versions(CONSUMER_2, {TEST_RESOURCE_TYPE: TEST_VERSION_A})
|
||||
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_B})
|
||||
|
||||
self.assertEqual(set([TEST_VERSION_A, TEST_VERSION_B]),
|
||||
cv.get_resource_versions(TEST_RESOURCE_TYPE))
|
||||
|
||||
def test_consumer_downgrades_removing_resource(self):
|
||||
cv = version_manager.ResourceConsumerTracker()
|
||||
|
||||
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_B,
|
||||
TEST_RESOURCE_TYPE_2: TEST_VERSION_A})
|
||||
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_A})
|
||||
|
||||
self.assertEqual(set(),
|
||||
cv.get_resource_versions(TEST_RESOURCE_TYPE_2))
|
||||
self.assertEqual(set([TEST_VERSION_A]),
|
||||
cv.get_resource_versions(TEST_RESOURCE_TYPE))
|
||||
|
||||
def test_consumer_downgrades_stops_reporting(self):
|
||||
cv = version_manager.ResourceConsumerTracker()
|
||||
|
||||
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_B,
|
||||
TEST_RESOURCE_TYPE_2: TEST_VERSION_A})
|
||||
cv.set_versions(CONSUMER_1, {})
|
||||
|
||||
for resource_type in [TEST_RESOURCE_TYPE, TEST_RESOURCE_TYPE_2]:
|
||||
self.assertEqual(set(),
|
||||
cv.get_resource_versions(resource_type))
|
||||
|
||||
def test_compatibility_liberty_sriov_and_ovs_agents(self):
|
||||
|
||||
def _fake_local_versions(self):
|
||||
local_versions = collections.defaultdict(set)
|
||||
local_versions[resources.QOS_POLICY].add('1.11')
|
||||
return local_versions
|
||||
|
||||
for agent_type in version_manager.NON_REPORTING_AGENT_TYPES:
|
||||
consumer_id = version_manager.AgentConsumer(agent_type,
|
||||
AGENT_HOST_1)
|
||||
|
||||
cv = version_manager.ResourceConsumerTracker()
|
||||
cv._get_local_resource_versions = _fake_local_versions
|
||||
cv._versions = _fake_local_versions(mock.ANY)
|
||||
|
||||
cv.set_versions(consumer_id, {})
|
||||
|
||||
self.assertEqual(set(['1.0', '1.11']),
|
||||
cv.get_resource_versions(resources.QOS_POLICY))
|
||||
|
||||
def test_different_adds_triggers_recalculation(self):
|
||||
cv = version_manager.ResourceConsumerTracker()
|
||||
|
||||
for version in [TEST_RESOURCE_VERSION_A, TEST_RESOURCE_VERSION_B]:
|
||||
for version in [TEST_VERSION_A, TEST_VERSION_B]:
|
||||
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: version})
|
||||
|
||||
self.assertTrue(cv._needs_recalculation)
|
||||
@ -75,8 +118,7 @@ class ResourceConsumerTrackerTest(base.BaseTestCase):
|
||||
|
||||
class CachedResourceConsumerTrackerTest(base.BaseTestCase):
|
||||
|
||||
# TODO(mangelajo): re-enable once we provide the callback
|
||||
def _test_exception_with_no_callback(self):
|
||||
def test_exception_with_no_callback(self):
|
||||
cached_tracker = version_manager.CachedResourceConsumerTracker()
|
||||
self.assertRaises(
|
||||
exceptions.VersionsCallbackNotFound,
|
||||
@ -85,7 +127,7 @@ class CachedResourceConsumerTrackerTest(base.BaseTestCase):
|
||||
def _set_consumer_versions_callback(self, cached_tracker):
|
||||
def consumer_versions(rct):
|
||||
rct.set_versions(CONSUMER_1,
|
||||
{TEST_RESOURCE_TYPE: TEST_RESOURCE_VERSION_A})
|
||||
{TEST_RESOURCE_TYPE: TEST_VERSION_A})
|
||||
|
||||
cached_tracker.set_consumer_versions_callback(consumer_versions)
|
||||
|
||||
@ -93,7 +135,7 @@ class CachedResourceConsumerTrackerTest(base.BaseTestCase):
|
||||
cached_tracker = version_manager.CachedResourceConsumerTracker()
|
||||
self._set_consumer_versions_callback(cached_tracker)
|
||||
|
||||
self.assertIn(TEST_RESOURCE_VERSION_A,
|
||||
self.assertIn(TEST_VERSION_A,
|
||||
cached_tracker.get_resource_versions(
|
||||
TEST_RESOURCE_TYPE))
|
||||
|
||||
@ -108,8 +150,8 @@ class CachedResourceConsumerTrackerTest(base.BaseTestCase):
|
||||
TEST_RESOURCE_TYPE_2)
|
||||
|
||||
cached_tracker.update_versions(
|
||||
CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_RESOURCE_VERSION_B,
|
||||
TEST_RESOURCE_TYPE_2: TEST_RESOURCE_VERSION_A})
|
||||
CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_B,
|
||||
TEST_RESOURCE_TYPE_2: TEST_VERSION_A})
|
||||
|
||||
final_versions = cached_tracker.get_resource_versions(
|
||||
TEST_RESOURCE_TYPE)
|
||||
@ -124,7 +166,7 @@ class CachedResourceConsumerTrackerTest(base.BaseTestCase):
|
||||
|
||||
def consumer_versions_callback(consumer_tracker):
|
||||
consumer_tracker.set_versions(
|
||||
CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_RESOURCE_VERSION_A})
|
||||
CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_A})
|
||||
self.refreshed = True
|
||||
|
||||
cached_tracker = version_manager.CachedResourceConsumerTracker()
|
||||
|
@ -143,6 +143,21 @@ class ResourcesPullRpcApiTestCase(ResourcesRpcBaseTestCase):
|
||||
resource_id)
|
||||
|
||||
|
||||
class ResourcesPushToServerRpcCallbackTestCase(ResourcesRpcBaseTestCase):
|
||||
|
||||
def test_report_versions(self):
|
||||
callbacks = resources_rpc.ResourcesPushToServerRpcCallback()
|
||||
with mock.patch('neutron.api.rpc.callbacks.version_manager'
|
||||
'.update_versions') as update_versions:
|
||||
version_map = {'A': '1.0'}
|
||||
callbacks.report_agent_resource_versions(context=mock.ANY,
|
||||
agent_type='DHCP Agent',
|
||||
agent_host='fake-host',
|
||||
version_map=version_map)
|
||||
update_versions.assert_called_once_with(mock.ANY,
|
||||
version_map)
|
||||
|
||||
|
||||
class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
@ -1,3 +1,4 @@
|
||||
# pylint: disable=pointless-string-statement
|
||||
# Copyright (c) 2013 OpenStack Foundation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
@ -26,6 +27,7 @@ from neutron.common import exceptions as n_exc
|
||||
from neutron import context
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import db_base_plugin_v2 as base_plugin
|
||||
from neutron.tests import base
|
||||
from neutron.tests.unit import testlib_api
|
||||
|
||||
# the below code is required for the following reason
|
||||
@ -38,6 +40,15 @@ from neutron.tests.unit import testlib_api
|
||||
load_tests = testscenarios.load_tests_apply_scenarios
|
||||
|
||||
|
||||
TEST_RESOURCE_VERSIONS = {"A": "1.0"}
|
||||
AGENT_STATUS = {'agent_type': 'Open vSwitch agent',
|
||||
'binary': 'neutron-openvswitch-agent',
|
||||
'host': 'overcloud-notcompute',
|
||||
'topic': 'N/A',
|
||||
'resource_versions': TEST_RESOURCE_VERSIONS}
|
||||
TEST_TIME = '2016-02-26T17:08:06.116'
|
||||
|
||||
|
||||
class FakePlugin(base_plugin.NeutronDbPluginV2, agents_db.AgentDbMixin):
|
||||
"""A fake plugin class containing all DB methods."""
|
||||
|
||||
@ -55,7 +66,8 @@ class TestAgentsDbBase(testlib_api.SqlTestCase):
|
||||
host=host,
|
||||
agent_type=agent_type,
|
||||
topic='foo_topic',
|
||||
configurations="",
|
||||
configurations="{}",
|
||||
resource_versions="{}",
|
||||
created_at=timeutils.utcnow(),
|
||||
started_at=timeutils.utcnow(),
|
||||
heartbeat_timestamp=timeutils.utcnow())
|
||||
@ -67,12 +79,19 @@ class TestAgentsDbBase(testlib_api.SqlTestCase):
|
||||
with self.context.session.begin(subtransactions=True):
|
||||
self.context.session.add(agent)
|
||||
|
||||
def _create_and_save_agents(self, hosts, agent_type, down_agents_count=0):
|
||||
def _create_and_save_agents(self, hosts, agent_type, down_agents_count=0,
|
||||
down_but_version_considered=0):
|
||||
agents = self._get_agents(hosts, agent_type)
|
||||
# bring down the specified agents
|
||||
for agent in agents[:down_agents_count]:
|
||||
agent['heartbeat_timestamp'] -= datetime.timedelta(minutes=60)
|
||||
|
||||
# bring down just enough so their version is still considered
|
||||
for agent in agents[down_agents_count:(
|
||||
down_but_version_considered + down_agents_count)]:
|
||||
agent['heartbeat_timestamp'] -= datetime.timedelta(
|
||||
seconds=(cfg.CONF.agent_down_time + 1))
|
||||
|
||||
self._save_agents(agents)
|
||||
return agents
|
||||
|
||||
@ -81,12 +100,7 @@ class TestAgentsDbMixin(TestAgentsDbBase):
|
||||
def setUp(self):
|
||||
super(TestAgentsDbMixin, self).setUp()
|
||||
|
||||
self.agent_status = {
|
||||
'agent_type': 'Open vSwitch agent',
|
||||
'binary': 'neutron-openvswitch-agent',
|
||||
'host': 'overcloud-notcompute',
|
||||
'topic': 'N/A'
|
||||
}
|
||||
self.agent_status = dict(AGENT_STATUS)
|
||||
|
||||
def test_get_enabled_agent_on_host_found(self):
|
||||
agents = self._create_and_save_agents(['foo_host'],
|
||||
@ -182,6 +196,27 @@ class TestAgentsDbMixin(TestAgentsDbBase):
|
||||
" DHCP Agent 2015-05-06 22:40:40.432295 some.node"}
|
||||
)
|
||||
|
||||
def test_get_dict(self):
|
||||
db_obj = mock.Mock(conf1='{"test": "1234"}')
|
||||
conf1 = self.plugin._get_dict(db_obj, 'conf1')
|
||||
self.assertIn('test', conf1)
|
||||
self.assertEqual("1234", conf1['test'])
|
||||
|
||||
def get_configurations_dict(self):
|
||||
db_obj = mock.Mock(configurations='{"cfg1": "val1"}')
|
||||
cfg = self.plugin.get_configuration_dict(db_obj)
|
||||
self.assertIn('cfg', cfg)
|
||||
|
||||
def test__get_agents_resource_versions(self):
|
||||
tracker = mock.Mock()
|
||||
self._create_and_save_agents(
|
||||
['host-%d' % i for i in range(5)],
|
||||
constants.AGENT_TYPE_L3,
|
||||
down_agents_count=3,
|
||||
down_but_version_considered=2)
|
||||
self.plugin._get_agents_resource_versions(tracker)
|
||||
self.assertEqual(tracker.set_versions.call_count, 2)
|
||||
|
||||
|
||||
class TestAgentsDbGetAgents(TestAgentsDbBase):
|
||||
scenarios = [
|
||||
@ -234,3 +269,33 @@ class TestAgentsDbGetAgents(TestAgentsDbBase):
|
||||
self.agents_alive == 'true')
|
||||
for agent in returned_agents:
|
||||
self.assertEqual(alive, agent['alive'])
|
||||
|
||||
|
||||
class TestAgentExtRpcCallback(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestAgentExtRpcCallback, self).setUp()
|
||||
self.plugin = mock.Mock()
|
||||
self.context = mock.Mock()
|
||||
self.callback = agents_db.AgentExtRpcCallback(self.plugin)
|
||||
self.callback.server_versions_rpc = mock.Mock()
|
||||
self.callback.START_TIME = datetime.datetime(datetime.MINYEAR, 1, 1)
|
||||
self.update_versions = mock.patch(
|
||||
'neutron.api.rpc.callbacks.version_manager.'
|
||||
'update_versions').start()
|
||||
self.agent_state = {'agent_state': dict(AGENT_STATUS)}
|
||||
|
||||
def test_create_or_update_agent_updates_version_manager(self):
|
||||
self.callback.report_state(self.context, agent_state=self.agent_state,
|
||||
time=TEST_TIME)
|
||||
self.update_versions.assert_called_once_with(
|
||||
mock.ANY, TEST_RESOURCE_VERSIONS)
|
||||
|
||||
def test_create_or_update_agent_updates_other_servers(self):
|
||||
callback = self.callback
|
||||
callback.report_state(self.context, agent_state=self.agent_state,
|
||||
time=TEST_TIME)
|
||||
report_agent_resource_versions = (
|
||||
callback.server_versions_rpc.report_agent_resource_versions)
|
||||
report_agent_resource_versions.assert_called_once_with(
|
||||
mock.ANY, mock.ANY, mock.ANY, TEST_RESOURCE_VERSIONS)
|
||||
|
Loading…
Reference in New Issue
Block a user