Use Agent OVO in agents_db and test_agents_db

Agent object has been merged [1].
This patch uses Agent object in agents_db and test_agents_db.

We also introduce a new function (get_agents_object) and keep
the old function (get_agents_db) for backward compatibility.

[1] https://review.openstack.org/#/c/297887/

Co-Authored-By: Nguyen Phuong An <AnNP@vn.fujitsu.com>
Change-Id: I4c4283cb1aa05d52dca00cc249e094ea7d55b1d3
Partially-Implements: blueprint adopt-oslo-versioned-objects-for-db
This commit is contained in:
Vu Cong Tuan 2017-08-18 17:01:08 +07:00
parent 8eaac1f8a9
commit 8b381c7324
10 changed files with 113 additions and 90 deletions

View File

@ -31,8 +31,6 @@ import oslo_messaging
from oslo_serialization import jsonutils
from oslo_utils import importutils
from oslo_utils import timeutils
from sqlalchemy.orm import exc
from sqlalchemy import sql
from neutron.agent.common import utils
from neutron.api.rpc.callbacks import version_manager
@ -44,6 +42,7 @@ from neutron.db import api as db_api
from neutron.db.models import agent as agent_model
from neutron.extensions import agent as ext_agent
from neutron.extensions import availability_zone as az_ext
from neutron.objects import agent as agent_obj
LOG = logging.getLogger(__name__)
@ -56,17 +55,23 @@ agents_db.register_db_agents_opts()
DOWNTIME_VERSIONS_RATIO = 2
def get_availability_zones_by_agent_type(context, agent_type,
availability_zones):
"""Get list of availability zones based on agent type"""
agents = agent_obj.Agent._get_agents_by_availability_zones_and_agent_type(
context, agent_type=agent_type, availability_zones=availability_zones)
return set(agent.availability_zone for agent in agents)
class AgentAvailabilityZoneMixin(az_ext.AvailabilityZonePluginBase):
"""Mixin class to add availability_zone extension to AgentDbMixin."""
def _list_availability_zones(self, context, filters=None):
result = {}
query = model_query.get_collection_query(context, agent_model.Agent,
filters=filters)
columns = (agent_model.Agent.admin_state_up,
agent_model.Agent.availability_zone,
agent_model.Agent.agent_type)
for agent in query.with_entities(*columns).group_by(*columns):
filters = filters or {}
agents = agent_obj.Agent.get_objects(context, filters)
for agent in agents:
if not agent.availability_zone:
continue
if agent.agent_type == constants.AGENT_TYPE_DHCP:
@ -104,13 +109,8 @@ class AgentAvailabilityZoneMixin(az_ext.AvailabilityZonePluginBase):
agent_type = constants.AGENT_TYPE_L3
else:
return
query = context.session.query(
agent_model.Agent.availability_zone).filter_by(
agent_type=agent_type).group_by(
agent_model.Agent.availability_zone)
query = query.filter(
agent_model.Agent.availability_zone.in_(availability_zones))
azs = [item[0] for item in query]
azs = get_availability_zones_by_agent_type(
context, agent_type, availability_zones)
diff = set(availability_zones) - set(azs)
if diff:
raise az_exc.AvailabilityZoneNotFound(availability_zone=diff.pop())
@ -120,22 +120,21 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
"""Mixin class to add agent extension to db_base_plugin_v2."""
def _get_agent(self, context, id):
try:
agent = model_query.get_by_id(context, agent_model.Agent, id)
except exc.NoResultFound:
agent = agent_obj.Agent.get_object(context, id=id)
if not agent:
raise ext_agent.AgentNotFound(id=id)
return agent
@db_api.retry_if_session_inactive()
def get_enabled_agent_on_host(self, context, agent_type, host):
"""Return agent of agent_type for the specified host."""
query = context.session.query(agent_model.Agent)
query = query.filter(agent_model.Agent.agent_type == agent_type,
agent_model.Agent.host == host,
agent_model.Agent.admin_state_up == sql.true())
try:
agent = query.one()
except exc.NoResultFound:
agent = agent_obj.Agent.get_object(context,
agent_type=agent_type,
host=host,
admin_state_up=True)
if not agent:
LOG.debug('No enabled %(agent_type)s agent on host '
'%(host)s', {'agent_type': agent_type, 'host': host})
return
@ -158,7 +157,14 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
json_value = None
try:
json_value = getattr(agent_db, dict_name)
conf = jsonutils.loads(json_value)
# TODO(tuanvu): after all agent_db is converted to agent_obj,
# we no longer need this.
# Without this check, some unit tests will fail
# because some of json_values are dict already
if not isinstance(json_value, dict):
conf = jsonutils.loads(json_value)
else:
conf = json_value
except Exception:
if json_value or not ignore_missing:
msg = ('Dictionary %(dict_name)s for agent %(agent_type)s '
@ -198,34 +204,41 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
agent = self._get_agent(context, id)
registry.notify(resources.AGENT, events.BEFORE_DELETE, self,
context=context, agent=agent)
with context.session.begin(subtransactions=True):
context.session.delete(agent)
agent.delete()
@db_api.retry_if_session_inactive()
def update_agent(self, context, id, agent):
agent_data = agent['agent']
with context.session.begin(subtransactions=True):
agent = self._get_agent(context, id)
agent.update(agent_data)
agent.update_fields(agent_data)
agent.update()
return self._make_agent_dict(agent)
@db_api.retry_if_session_inactive()
def get_agents_db(self, context, filters=None):
# TODO(annp): keep this method for backward compatibility,
# will need to clean it up later
query = model_query.get_collection_query(context,
agent_model.Agent,
filters=filters)
return query.all()
@db_api.retry_if_session_inactive()
def get_agent_objects(self, context, filters=None):
filters = filters or {}
return agent_obj.Agent.get_objects(context, **filters)
@db_api.retry_if_session_inactive()
def get_agents(self, context, filters=None, fields=None):
agents = model_query.get_collection(context, agent_model.Agent,
self._make_agent_dict,
filters=filters, fields=fields)
alive = filters and filters.get('alive', None)
filters = filters or {}
alive = filters and filters.pop('alive', None)
agents = agent_obj.Agent.get_objects(context, **filters)
if alive:
alive = converters.convert_to_boolean(alive[0])
agents = [agent for agent in agents if agent['alive'] == alive]
return agents
agents = [agent for agent in agents if agent.is_active == alive]
return [self._make_agent_dict(agent, fields=fields)
for agent in agents]
@db_api.retry_db_errors
def agent_health_check(self):
@ -249,17 +262,16 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
len(agents))
def _get_agent_by_type_and_host(self, context, agent_type, host):
query = model_query.query_with_hooks(context, agent_model.Agent)
try:
agent_db = query.filter(agent_model.Agent.agent_type == agent_type,
agent_model.Agent.host == host).one()
return agent_db
except exc.NoResultFound:
agent_objs = agent_obj.Agent.get_objects(context,
agent_type=agent_type,
host=host)
if not agent_objs:
raise ext_agent.AgentNotFoundByTypeHost(agent_type=agent_type,
host=host)
except exc.MultipleResultsFound:
raise ext_agent.MultipleAgentFoundByTypeHost(agent_type=agent_type,
host=host)
if len(agent_objs) > 1:
raise ext_agent.MultipleAgentFoundByTypeHost(
agent_type=agent_type, host=host)
return agent_objs[0]
@db_api.retry_if_session_inactive()
def get_agent(self, context, id, fields=None):
@ -312,9 +324,9 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
res['load'] = self._get_agent_load(agent_state)
current_time = timeutils.utcnow()
try:
agent_db = self._get_agent_by_type_and_host(
agent = self._get_agent_by_type_and_host(
context, agent_state['agent_type'], agent_state['host'])
if not agent_db.is_active:
if not agent.is_active:
status = agent_consts.AGENT_REVIVED
if 'resource_versions' not in agent_state:
# updating agent_state with resource_versions taken
@ -322,13 +334,14 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
# _update_local_agent_resource_versions() will call
# version_manager and bring it up to date
agent_state['resource_versions'] = self._get_dict(
agent_db, 'resource_versions', ignore_missing=True)
agent, 'resource_versions', ignore_missing=True)
res['heartbeat_timestamp'] = current_time
if agent_state.get('start_flag'):
res['started_at'] = current_time
greenthread.sleep(0)
self._log_heartbeat(agent_state, agent_db, configurations_dict)
agent_db.update(res)
self._log_heartbeat(agent_state, agent, configurations_dict)
agent.update_fields(res)
agent.update()
event_type = events.AFTER_UPDATE
except ext_agent.AgentNotFoundByTypeHost:
greenthread.sleep(0)
@ -336,11 +349,11 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
res['started_at'] = current_time
res['heartbeat_timestamp'] = current_time
res['admin_state_up'] = cfg.CONF.enable_new_agents
agent_db = agent_model.Agent(**res)
agent = agent_obj.Agent(context=context, **res)
greenthread.sleep(0)
context.session.add(agent_db)
agent.create()
event_type = events.AFTER_CREATE
self._log_heartbeat(agent_state, agent_db, configurations_dict)
self._log_heartbeat(agent_state, agent, configurations_dict)
status = agent_consts.AGENT_NEW
greenthread.sleep(0)

View File

@ -292,7 +292,7 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
dhcp_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_DHCP)
dead_bindings = [b for b in
self._filter_bindings(context, down_bindings)]
agents = self.get_agents_db(
agents = self.get_agent_objects(
context, {'agent_type': [constants.AGENT_TYPE_DHCP]})
if not agents:
# No agents configured so nothing to do.

View File

@ -144,3 +144,15 @@ class Agent(base.NeutronDbObject):
hosts = [host[0] for host in query]
agents = cls.get_objects(context, host=hosts)
return agents
@classmethod
def _get_agents_by_availability_zones_and_agent_type(
cls, context, agent_type, availability_zones):
query = context.session.query(
agent_model.Agent).filter_by(
agent_type=agent_type).group_by(
agent_model.Agent.availability_zone)
query = query.filter(
agent_model.Agent.availability_zone.in_(availability_zones)).all()
agents = [cls._load_object(context, record) for record in query]
return agents

View File

@ -37,9 +37,8 @@ def count(context, model, **kwargs):
def _kwargs_to_filters(**kwargs):
return {k: v if (isinstance(v, list) or
isinstance(v, obj_utils.StringMatchingFilterObj))
else [v]
retain_classes = (list, set, obj_utils.StringMatchingFilterObj)
return {k: v if isinstance(v, retain_classes) else [v]
for k, v in kwargs.items()}

View File

@ -28,7 +28,6 @@ class BaseResourceFilter(object):
def bind(self, context, agents, resource_id):
"""Bind the resource to the agents."""
with context.session.begin(subtransactions=True):
res = {}
for agent in agents:
# Load is being incremented here to reflect latest agent load
# even within the agent report interval. This will be very
@ -39,5 +38,5 @@ class BaseResourceFilter(object):
# problem because "+ 1" here does not meant to predict
# precisely what the load of the agent will be. The value will
# be corrected by the agent on the next report interval.
res['load'] = agent.load + 1
agent.update(res)
agent.load += 1
agent.update()

View File

@ -22,10 +22,9 @@ from neutron_lib import constants
from neutron_lib.objects import exceptions
from oslo_config import cfg
from oslo_log import log as logging
from sqlalchemy import sql
from neutron.agent.common import utils as agent_utils
from neutron.db.models import agent as agent_model
from neutron.objects import agent as agent_obj
from neutron.objects import network
from neutron.scheduler import base_resource_filter
from neutron.scheduler import base_scheduler
@ -56,12 +55,9 @@ class AutoScheduler(object):
if not net_ids:
LOG.debug('No non-hosted networks')
return False
query = context.session.query(agent_model.Agent)
query = query.filter(
agent_model.Agent.agent_type == constants.AGENT_TYPE_DHCP,
agent_model.Agent.host == host,
agent_model.Agent.admin_state_up == sql.true())
dhcp_agents = query.all()
dhcp_agents = agent_obj.Agent.get_objects(
context, agent_type=constants.AGENT_TYPE_DHCP,
host=host, admin_state_up=True)
segment_host_mapping = network.SegmentHostMapping.get_objects(
context, host=host)
@ -244,7 +240,7 @@ class DhcpFilter(base_resource_filter.BaseResourceFilter):
'admin_state_up': [True]}
if az_hints:
filters['availability_zone'] = az_hints
active_dhcp_agents = plugin.get_agents_db(
active_dhcp_agents = plugin.get_agent_objects(
context, filters=filters)
if not active_dhcp_agents:
LOG.warning('No more DHCP agents')

View File

@ -28,7 +28,8 @@ import testscenarios
from neutron.db import agents_db
from neutron.db import db_base_plugin_v2 as base_plugin
from neutron.db.models import agent as agent_model
from neutron.objects import agent as agent_obj
from neutron.objects import base
from neutron.tests.unit import testlib_api
# the below code is required for the following reason
@ -62,24 +63,19 @@ class TestAgentsDbBase(testlib_api.SqlTestCase):
def _get_agents(self, hosts, agent_type):
return [
agent_model.Agent(
agent_obj.Agent(
context=self.context,
binary='foo-agent',
host=host,
agent_type=agent_type,
topic='foo_topic',
configurations="{}",
resource_versions="{}",
created_at=timeutils.utcnow(),
started_at=timeutils.utcnow(),
heartbeat_timestamp=timeutils.utcnow())
for host in hosts
]
def _save_agents(self, agents):
for agent in agents:
with self.context.session.begin(subtransactions=True):
self.context.session.add(agent)
def _create_and_save_agents(self, hosts, agent_type, down_agents_count=0,
down_but_version_considered=0):
agents = self._get_agents(hosts, agent_type)
@ -93,7 +89,8 @@ class TestAgentsDbBase(testlib_api.SqlTestCase):
agent['heartbeat_timestamp'] -= datetime.timedelta(
seconds=(cfg.CONF.agent_down_time + 1))
self._save_agents(agents)
for agent in agents:
agent.create()
return agents
@ -159,12 +156,16 @@ class TestAgentsDbMixin(TestAgentsDbBase):
# NOTE(rpodolyaka): emulate violation of the unique constraint caused
# by a concurrent insert. Ensure we make another
# attempt on fail
with mock.patch('sqlalchemy.orm.Session.add') as add_mock:
mock.patch(
'neutron.objects.base.NeutronDbObject.modify_fields_from_db'
).start()
mock.patch.object(self.context.session, 'expunge').start()
with mock.patch('neutron.objects.db.api.create_object') as add_mock:
add_mock.side_effect = [
exc.DBDuplicateEntry(),
None
mock.Mock()
]
self.plugin.create_or_update_agent(self.context, self.agent_status)
self.assertEqual(add_mock.call_count, 2,
@ -360,7 +361,10 @@ class TestAgentExtRpcCallback(TestAgentsDbBase):
def _take_down_agent(self):
with self.context.session.begin(subtransactions=True):
query = self.context.session.query(agent_model.Agent)
agt = query.first()
agt.heartbeat_timestamp = (
agt.heartbeat_timestamp - datetime.timedelta(hours=1))
pager = base.Pager(limit=1)
agent_objs = agent_obj.Agent.get_objects(self.context,
_pager=pager)
agent_objs[0].heartbeat_timestamp = (
agent_objs[0].heartbeat_timestamp - datetime.timedelta(
hours=1))
agent_objs[0].update()

View File

@ -532,7 +532,7 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
'get_dhcp_agents_hosting_networks',
autospec=True) as mock_hosting_agents:
mock_hosting_agents.return_value = plugin.get_agents_db(
mock_hosting_agents.return_value = plugin.get_agent_objects(
self.adminContext)
with self.network('test') as net1:
pass

View File

@ -213,15 +213,15 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
def _bind_router(self, router_id, tenant_id):
scheduler = l3_agent_scheduler.ChanceScheduler()
filters = {'agent_type': [constants.AGENT_TYPE_L3]}
agents_db = self.plugin.get_agents_db(self.adminContext,
filters=filters)
for agent_db in agents_db:
agents_object = self.plugin.get_agent_objects(
self.adminContext, filters=filters)
for agent_obj in agents_object:
scheduler.create_ha_port_and_bind(
self.plugin,
self.adminContext,
router_id,
tenant_id,
agent_db)
agent_obj)
self._bind_ha_network_ports(router_id)
def _bind_ha_network_ports(self, router_id):

View File

@ -116,7 +116,7 @@ class TestDhcpScheduler(TestDhcpSchedulerBaseTestCase):
plugin = mock.Mock()
plugin.get_subnets.return_value = [{"network_id": self.network_id,
"enable_dhcp": True}]
plugin.get_agents_db.return_value = dead_agent + alive_agent
plugin.get_agent_objects.return_value = dead_agent + alive_agent
plugin.filter_hosts_with_network_access.side_effect = (
lambda context, network_id, hosts: hosts)
if active_hosts_only: