Merge "Switch to new engine facade for Schedulers"
This commit is contained in:
commit
76754e06f5
@ -19,6 +19,7 @@ import time
|
||||
|
||||
from neutron_lib import constants
|
||||
from neutron_lib import context as ncontext
|
||||
from neutron_lib.db import api as db_api
|
||||
from neutron_lib import exceptions as n_exc
|
||||
from neutron_lib.exceptions import agent as agent_exc
|
||||
from neutron_lib.exceptions import dhcpagentscheduler as das_exc
|
||||
@ -378,7 +379,7 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
|
||||
|
||||
def add_network_to_dhcp_agent(self, context, id, network_id):
|
||||
self._get_network(context, network_id)
|
||||
with context.session.begin(subtransactions=True):
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
agent_db = self._get_agent(context, id)
|
||||
if (agent_db['agent_type'] != constants.AGENT_TYPE_DHCP or
|
||||
not services_available(agent_db['admin_state_up'])):
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
from neutron_lib.api import extensions
|
||||
from neutron_lib import constants
|
||||
from neutron_lib.db import api as db_api
|
||||
from neutron_lib.exceptions import agent as agent_exc
|
||||
from neutron_lib.plugins import constants as plugin_constants
|
||||
from neutron_lib.plugins import directory
|
||||
@ -159,7 +160,7 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
||||
if not self.router_supports_scheduling(context, router_id):
|
||||
raise l3agentscheduler.RouterDoesntSupportScheduling(
|
||||
router_id=router_id)
|
||||
with context.session.begin(subtransactions=True):
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
router = self.get_router(context, router_id)
|
||||
agent = self._get_agent(context, agent_id)
|
||||
self.validate_agent_router_combination(context, agent, router)
|
||||
@ -226,7 +227,7 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
||||
context, router_id=router_id, l3_agent_id=agent_id)
|
||||
|
||||
def _unschedule_router(self, context, router_id, agents_ids):
|
||||
with context.session.begin(subtransactions=True):
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
for agent_id in agents_ids:
|
||||
self._unbind_router(context, router_id, agent_id)
|
||||
|
||||
@ -238,7 +239,7 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
||||
"""
|
||||
cur_agents = self.list_l3_agents_hosting_router(
|
||||
context, router_id)['agents']
|
||||
with context.session.begin(subtransactions=True):
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
cur_agents_ids = [agent['id'] for agent in cur_agents]
|
||||
self._unschedule_router(context, router_id, cur_agents_ids)
|
||||
|
||||
@ -390,7 +391,7 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
||||
context, router_ids))
|
||||
|
||||
def list_l3_agents_hosting_router(self, context, router_id):
|
||||
with context.session.begin(subtransactions=True):
|
||||
with db_api.CONTEXT_READER.using(context):
|
||||
agents = self._get_l3_agents_hosting_routers(
|
||||
context, [router_id])
|
||||
return {'agents': [self._make_agent_dict(agent)
|
||||
|
@ -19,6 +19,7 @@ from operator import itemgetter
|
||||
|
||||
from neutron_lib.api.definitions import availability_zone as az_def
|
||||
from neutron_lib import constants
|
||||
from neutron_lib.db import api as db_api
|
||||
from neutron_lib.objects import exceptions
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
@ -41,7 +42,9 @@ class AutoScheduler(object):
|
||||
agents_per_network = cfg.CONF.dhcp_agents_per_network
|
||||
# a list of (agent, net_ids) tuples
|
||||
bindings_to_add = []
|
||||
with context.session.begin(subtransactions=True):
|
||||
# NOTE(ralonsoh) use writer manager to call get_network. See
|
||||
# https://review.opendev.org/#/c/483518/. Must be changed to READER.
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
fields = ['network_id', 'enable_dhcp', 'segment_id']
|
||||
subnets = plugin.get_subnets(context, fields=fields)
|
||||
net_ids = {}
|
||||
@ -230,7 +233,7 @@ class DhcpFilter(base_resource_filter.BaseResourceFilter):
|
||||
agents_per_network = cfg.CONF.dhcp_agents_per_network
|
||||
# TODO(gongysh) don't schedule the networks with only
|
||||
# subnets whose enable_dhcp is false
|
||||
with context.session.begin(subtransactions=True):
|
||||
with db_api.CONTEXT_READER.using(context):
|
||||
network_hosted_agents = plugin.get_dhcp_agents_hosting_networks(
|
||||
context, [network['id']], hosts=network.get('candidate_hosts'))
|
||||
if len(network_hosted_agents) >= agents_per_network:
|
||||
@ -241,7 +244,7 @@ class DhcpFilter(base_resource_filter.BaseResourceFilter):
|
||||
|
||||
def _get_active_agents(self, plugin, context, az_hints):
|
||||
"""Return a list of active dhcp agents."""
|
||||
with context.session.begin(subtransactions=True):
|
||||
with db_api.CONTEXT_READER.using(context):
|
||||
filters = {'agent_type': [constants.AGENT_TYPE_DHCP],
|
||||
'admin_state_up': [True]}
|
||||
if az_hints:
|
||||
|
@ -117,7 +117,7 @@ class L3Scheduler(object):
|
||||
def _get_candidates(self, plugin, context, sync_router):
|
||||
"""Return L3 agents where a router could be scheduled."""
|
||||
is_ha = sync_router.get('ha', False)
|
||||
with context.session.begin(subtransactions=True):
|
||||
with lib_db_api.CONTEXT_READER.using(context):
|
||||
# allow one router is hosted by just
|
||||
# one enabled l3 agent hosting since active is just a
|
||||
# timing problem. Non-active l3 agent can return to
|
||||
@ -168,7 +168,7 @@ class L3Scheduler(object):
|
||||
binding_index) fails because some other RouterL3AgentBinding was
|
||||
concurrently created using the same binding_index, then the function
|
||||
will retry to create an entry with a new binding_index. This creation
|
||||
will be retried up to db_api.MAX_RETRIES times.
|
||||
will be retried up to lib_db_api.MAX_RETRIES times.
|
||||
If, still in the HA router case, the creation failed because the
|
||||
router has already been bound to the l3 agent in question or has been
|
||||
removed (by a concurrent operation), then no further attempts will be
|
||||
@ -286,6 +286,8 @@ class L3Scheduler(object):
|
||||
port_binding = utils.create_object_with_dependency(
|
||||
creator, dep_getter, dep_creator,
|
||||
dep_id_attr, dep_deleter)[0]
|
||||
# NOTE(ralonsoh): to be migrated to the new facade that can't be
|
||||
# used with "create_object_with_dependency".
|
||||
with lib_db_api.autonested_transaction(context.session):
|
||||
port_binding.l3_agent_id = agent['id']
|
||||
except db_exc.DBDuplicateEntry:
|
||||
|
@ -20,6 +20,7 @@ import datetime
|
||||
import mock
|
||||
from neutron_lib import constants
|
||||
from neutron_lib import context
|
||||
from neutron_lib.db import api as db_api
|
||||
from neutron_lib import exceptions as n_exc
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception as exc
|
||||
@ -365,7 +366,7 @@ class TestAgentExtRpcCallback(TestAgentsDbBase):
|
||||
mock.ANY, mock.ANY, mock.ANY, TEST_RESOURCE_VERSIONS)
|
||||
|
||||
def _take_down_agent(self):
|
||||
with self.context.session.begin(subtransactions=True):
|
||||
with db_api.CONTEXT_WRITER.using(self.context):
|
||||
pager = base.Pager(limit=1)
|
||||
agent_objs = agent_obj.Agent.get_objects(self.context,
|
||||
_pager=pager)
|
||||
|
@ -19,6 +19,7 @@ import mock
|
||||
from neutron_lib.api.definitions import dhcpagentscheduler as das_apidef
|
||||
from neutron_lib import constants
|
||||
from neutron_lib import context
|
||||
from neutron_lib.db import api as db_api
|
||||
from neutron_lib.plugins import constants as plugin_constants
|
||||
from neutron_lib.plugins import directory
|
||||
from neutron_lib import rpc as n_rpc
|
||||
@ -683,22 +684,20 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
|
||||
|
||||
def _take_down_agent_and_run_reschedule(self, host):
|
||||
# take down the agent on host A and ensure B is alive
|
||||
self.adminContext.session.begin(subtransactions=True)
|
||||
query = self.adminContext.session.query(agent_model.Agent)
|
||||
agt = query.filter_by(host=host).first()
|
||||
agt.heartbeat_timestamp = (
|
||||
agt.heartbeat_timestamp - datetime.timedelta(hours=1))
|
||||
self.adminContext.session.commit()
|
||||
with db_api.CONTEXT_WRITER.using(self.adminContext):
|
||||
query = self.adminContext.session.query(agent_model.Agent)
|
||||
agt = query.filter_by(host=host).first()
|
||||
agt.heartbeat_timestamp = (
|
||||
agt.heartbeat_timestamp - datetime.timedelta(hours=1))
|
||||
|
||||
plugin = directory.get_plugin(plugin_constants.L3)
|
||||
plugin.reschedule_routers_from_down_agents()
|
||||
|
||||
def _set_agent_admin_state_up(self, host, state):
|
||||
self.adminContext.session.begin(subtransactions=True)
|
||||
query = self.adminContext.session.query(agent_model.Agent)
|
||||
agt_db = query.filter_by(host=host).first()
|
||||
agt_db.admin_state_up = state
|
||||
self.adminContext.session.commit()
|
||||
with db_api.CONTEXT_WRITER.using(self.adminContext):
|
||||
query = self.adminContext.session.query(agent_model.Agent)
|
||||
agt_db = query.filter_by(host=host).first()
|
||||
agt_db.admin_state_up = state
|
||||
|
||||
def test_router_rescheduler_catches_rpc_db_and_reschedule_exceptions(self):
|
||||
with self.router():
|
||||
|
Loading…
x
Reference in New Issue
Block a user