Merge "Relocate Agent DB model"
This commit is contained in:
commit
890f5ed0fd
neutron
agent/common
conf/agent/database
db
agents_db.pyagentschedulers_db.pyl3_agentschedulers_db.pyl3_hamode_db.pyl3_hascheduler_db.py
migration/models
models
network_dhcp_agent_binding
plugins/ml2
scheduler
tests/unit
api/rpc/agentnotifiers
db
scheduler
services/metering
@ -17,10 +17,12 @@ import os
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from neutron._i18n import _LE
|
||||
from neutron.agent.common import config
|
||||
from neutron.common import utils as neutron_utils
|
||||
from neutron.conf.agent.database import agents_db
|
||||
|
||||
|
||||
if os.name == 'nt':
|
||||
@ -31,6 +33,7 @@ else:
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
config.register_root_helper(cfg.CONF)
|
||||
agents_db.register_agent_opts()
|
||||
|
||||
INTERFACE_NAMESPACE = 'neutron.interface_drivers'
|
||||
|
||||
@ -53,3 +56,8 @@ def load_interface_driver(conf):
|
||||
LOG.error(_LE("Error loading interface driver '%s'"),
|
||||
conf.interface_driver)
|
||||
raise SystemExit(1)
|
||||
|
||||
|
||||
def is_agent_down(heart_beat_time):
|
||||
return timeutils.is_older_than(heart_beat_time,
|
||||
cfg.CONF.agent_down_time)
|
||||
|
0
neutron/conf/agent/database/__init__.py
Normal file
0
neutron/conf/agent/database/__init__.py
Normal file
28
neutron/conf/agent/database/agents_db.py
Normal file
28
neutron/conf/agent/database/agents_db.py
Normal file
@ -0,0 +1,28 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from neutron._i18n import _
|
||||
|
||||
AGENT_OPTS = [
|
||||
cfg.IntOpt('agent_down_time', default=75,
|
||||
help=_("Seconds to regard the agent is down; should be at "
|
||||
"least twice report_interval, to be sure the "
|
||||
"agent is down for good.")),
|
||||
]
|
||||
|
||||
|
||||
def register_agent_opts(cfg=cfg.CONF):
|
||||
# NOTE(tonytan4ever): will centralize all agent config options from
|
||||
# another patch. see https://review.openstack.org/#/c/344877
|
||||
cfg.register_opts(AGENT_OPTS)
|
@ -15,10 +15,10 @@
|
||||
|
||||
import datetime
|
||||
|
||||
import debtcollector
|
||||
from eventlet import greenthread
|
||||
from neutron_lib.api import converters
|
||||
from neutron_lib import constants
|
||||
from neutron_lib.db import model_base
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
@ -26,19 +26,20 @@ from oslo_serialization import jsonutils
|
||||
from oslo_utils import importutils
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.orm import exc
|
||||
from sqlalchemy import sql
|
||||
|
||||
from neutron._i18n import _, _LE, _LI, _LW
|
||||
from neutron.agent.common import utils
|
||||
from neutron.api.rpc.callbacks import version_manager
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.callbacks import events
|
||||
from neutron.callbacks import registry
|
||||
from neutron.callbacks import resources
|
||||
from neutron.common import _deprecate
|
||||
from neutron.common import constants as n_const
|
||||
from neutron import context
|
||||
from neutron.db import api as db_api
|
||||
from neutron.db.models import agent as agent_model
|
||||
from neutron.extensions import agent as ext_agent
|
||||
from neutron.extensions import availability_zone as az_ext
|
||||
from neutron import manager
|
||||
@ -46,10 +47,6 @@ from neutron import manager
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
AGENT_OPTS = [
|
||||
cfg.IntOpt('agent_down_time', default=75,
|
||||
help=_("Seconds to regard the agent is down; should be at "
|
||||
"least twice report_interval, to be sure the "
|
||||
"agent is down for good.")),
|
||||
cfg.StrOpt('dhcp_load_type', default='networks',
|
||||
choices=['networks', 'subnets', 'ports'],
|
||||
help=_('Representing the resource type whose load is being '
|
||||
@ -81,45 +78,7 @@ cfg.CONF.register_opts(AGENT_OPTS)
|
||||
DOWNTIME_VERSIONS_RATIO = 2
|
||||
|
||||
|
||||
class Agent(model_base.BASEV2, model_base.HasId):
|
||||
"""Represents agents running in neutron deployments."""
|
||||
|
||||
__table_args__ = (
|
||||
sa.UniqueConstraint('agent_type', 'host',
|
||||
name='uniq_agents0agent_type0host'),
|
||||
model_base.BASEV2.__table_args__
|
||||
)
|
||||
|
||||
# L3 agent, DHCP agent, OVS agent, LinuxBridge
|
||||
agent_type = sa.Column(sa.String(255), nullable=False)
|
||||
binary = sa.Column(sa.String(255), nullable=False)
|
||||
# TOPIC is a fanout exchange topic
|
||||
topic = sa.Column(sa.String(255), nullable=False)
|
||||
# TOPIC.host is a target topic
|
||||
host = sa.Column(sa.String(255), nullable=False)
|
||||
availability_zone = sa.Column(sa.String(255))
|
||||
admin_state_up = sa.Column(sa.Boolean, default=True,
|
||||
server_default=sql.true(), nullable=False)
|
||||
# the time when first report came from agents
|
||||
created_at = sa.Column(sa.DateTime, nullable=False)
|
||||
# the time when first report came after agents start
|
||||
started_at = sa.Column(sa.DateTime, nullable=False)
|
||||
# updated when agents report
|
||||
heartbeat_timestamp = sa.Column(sa.DateTime, nullable=False)
|
||||
# description is note for admin user
|
||||
description = sa.Column(sa.String(attributes.DESCRIPTION_MAX_LEN))
|
||||
# configurations: a json dict string, I think 4095 is enough
|
||||
configurations = sa.Column(sa.String(4095), nullable=False)
|
||||
# resource_versions: json dict, 8191 allows for ~256 resource versions
|
||||
# assuming ~32byte length "'name': 'ver',"
|
||||
# the whole row limit is 65535 bytes in mysql
|
||||
resource_versions = sa.Column(sa.String(8191))
|
||||
# load - number of resources hosted by the agent
|
||||
load = sa.Column(sa.Integer, server_default='0', nullable=False)
|
||||
|
||||
@property
|
||||
def is_active(self):
|
||||
return not AgentDbMixin.is_agent_down(self.heartbeat_timestamp)
|
||||
_deprecate._moved_global('Agent', new_module=agent_model)
|
||||
|
||||
|
||||
class AgentAvailabilityZoneMixin(az_ext.AvailabilityZonePluginBase):
|
||||
@ -127,9 +86,11 @@ class AgentAvailabilityZoneMixin(az_ext.AvailabilityZonePluginBase):
|
||||
|
||||
def _list_availability_zones(self, context, filters=None):
|
||||
result = {}
|
||||
query = self._get_collection_query(context, Agent, filters=filters)
|
||||
columns = (Agent.admin_state_up, Agent.availability_zone,
|
||||
Agent.agent_type)
|
||||
query = self._get_collection_query(context, agent_model.Agent,
|
||||
filters=filters)
|
||||
columns = (agent_model.Agent.admin_state_up,
|
||||
agent_model.Agent.availability_zone,
|
||||
agent_model.Agent.agent_type)
|
||||
for agent in query.with_entities(*columns).group_by(*columns):
|
||||
if not agent.availability_zone:
|
||||
continue
|
||||
@ -168,9 +129,12 @@ class AgentAvailabilityZoneMixin(az_ext.AvailabilityZonePluginBase):
|
||||
agent_type = constants.AGENT_TYPE_L3
|
||||
else:
|
||||
return
|
||||
query = context.session.query(Agent.availability_zone).filter_by(
|
||||
agent_type=agent_type).group_by(Agent.availability_zone)
|
||||
query = query.filter(Agent.availability_zone.in_(availability_zones))
|
||||
query = context.session.query(
|
||||
agent_model.Agent.availability_zone).filter_by(
|
||||
agent_type=agent_type).group_by(
|
||||
agent_model.Agent.availability_zone)
|
||||
query = query.filter(
|
||||
agent_model.Agent.availability_zone.in_(availability_zones))
|
||||
azs = [item[0] for item in query]
|
||||
diff = set(availability_zones) - set(azs)
|
||||
if diff:
|
||||
@ -182,7 +146,7 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||
|
||||
def _get_agent(self, context, id):
|
||||
try:
|
||||
agent = self._get_by_id(context, Agent, id)
|
||||
agent = self._get_by_id(context, agent_model.Agent, id)
|
||||
except exc.NoResultFound:
|
||||
raise ext_agent.AgentNotFound(id=id)
|
||||
return agent
|
||||
@ -190,25 +154,31 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||
@db_api.retry_if_session_inactive()
|
||||
def get_enabled_agent_on_host(self, context, agent_type, host):
|
||||
"""Return agent of agent_type for the specified host."""
|
||||
query = context.session.query(Agent)
|
||||
query = query.filter(Agent.agent_type == agent_type,
|
||||
Agent.host == host,
|
||||
Agent.admin_state_up == sql.true())
|
||||
query = context.session.query(agent_model.Agent)
|
||||
query = query.filter(agent_model.Agent.agent_type == agent_type,
|
||||
agent_model.Agent.host == host,
|
||||
agent_model.Agent.admin_state_up == sql.true())
|
||||
try:
|
||||
agent = query.one()
|
||||
except exc.NoResultFound:
|
||||
LOG.debug('No enabled %(agent_type)s agent on host '
|
||||
'%(host)s', {'agent_type': agent_type, 'host': host})
|
||||
return
|
||||
if self.is_agent_down(agent.heartbeat_timestamp):
|
||||
|
||||
if utils.is_agent_down(agent.heartbeat_timestamp):
|
||||
LOG.warning(_LW('%(agent_type)s agent %(agent_id)s is not active'),
|
||||
{'agent_type': agent_type, 'agent_id': agent.id})
|
||||
return agent
|
||||
|
||||
@debtcollector.removals.remove(
|
||||
message="This will be removed in the future. "
|
||||
"Please use 'neutron.agent.common.utils.is_agent_down' "
|
||||
"instead.",
|
||||
version='ocata'
|
||||
)
|
||||
@staticmethod
|
||||
def is_agent_down(heart_beat_time):
|
||||
return timeutils.is_older_than(heart_beat_time,
|
||||
cfg.CONF.agent_down_time)
|
||||
return utils.is_agent_down(heart_beat_time)
|
||||
|
||||
@staticmethod
|
||||
def is_agent_considered_for_versions(agent_dict):
|
||||
@ -249,7 +219,9 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||
ext_agent.RESOURCE_NAME + 's')
|
||||
res = dict((k, agent[k]) for k in attr
|
||||
if k not in ['alive', 'configurations'])
|
||||
res['alive'] = not self.is_agent_down(res['heartbeat_timestamp'])
|
||||
res['alive'] = not utils.is_agent_down(
|
||||
res['heartbeat_timestamp']
|
||||
)
|
||||
res['configurations'] = self._get_dict(agent, 'configurations')
|
||||
res['resource_versions'] = self._get_dict(agent, 'resource_versions',
|
||||
ignore_missing=True)
|
||||
@ -274,12 +246,14 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
def get_agents_db(self, context, filters=None):
|
||||
query = self._get_collection_query(context, Agent, filters=filters)
|
||||
query = self._get_collection_query(context,
|
||||
agent_model.Agent,
|
||||
filters=filters)
|
||||
return query.all()
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
def get_agents(self, context, filters=None, fields=None):
|
||||
agents = self._get_collection(context, Agent,
|
||||
agents = self._get_collection(context, agent_model.Agent,
|
||||
self._make_agent_dict,
|
||||
filters=filters, fields=fields)
|
||||
alive = filters and filters.get('alive', None)
|
||||
@ -310,10 +284,10 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||
len(agents))
|
||||
|
||||
def _get_agent_by_type_and_host(self, context, agent_type, host):
|
||||
query = self._model_query(context, Agent)
|
||||
query = self._model_query(context, agent_model.Agent)
|
||||
try:
|
||||
agent_db = query.filter(Agent.agent_type == agent_type,
|
||||
Agent.host == host).one()
|
||||
agent_db = query.filter(agent_model.Agent.agent_type == agent_type,
|
||||
agent_model.Agent.host == host).one()
|
||||
return agent_db
|
||||
except exc.NoResultFound:
|
||||
raise ext_agent.AgentNotFoundByTypeHost(agent_type=agent_type,
|
||||
@ -397,7 +371,7 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||
res['started_at'] = current_time
|
||||
res['heartbeat_timestamp'] = current_time
|
||||
res['admin_state_up'] = cfg.CONF.enable_new_agents
|
||||
agent_db = Agent(**res)
|
||||
agent_db = agent_model.Agent(**res)
|
||||
greenthread.sleep(0)
|
||||
context.session.add(agent_db)
|
||||
event_type = events.AFTER_CREATE
|
||||
@ -528,3 +502,6 @@ class AgentExtRpcCallback(object):
|
||||
"%(diff)s seconds, which is more than the "
|
||||
"threshold agent down"
|
||||
"time: %(threshold)s."), log_dict)
|
||||
|
||||
|
||||
_deprecate._MovedGlobals()
|
||||
|
@ -33,6 +33,7 @@ from neutron.common import utils
|
||||
from neutron import context as ncontext
|
||||
from neutron.db import agents_db
|
||||
from neutron.db.availability_zone import network as network_az
|
||||
from neutron.db.models import agent as agent_model
|
||||
from neutron.db.network_dhcp_agent_binding import models as ndab_model
|
||||
from neutron.extensions import agent as ext_agent
|
||||
from neutron.extensions import dhcpagentscheduler
|
||||
@ -381,9 +382,9 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
|
||||
try:
|
||||
down_bindings = (
|
||||
context.session.query(ndab_model.NetworkDhcpAgentBinding).
|
||||
join(agents_db.Agent).
|
||||
filter(agents_db.Agent.heartbeat_timestamp < cutoff,
|
||||
agents_db.Agent.admin_state_up))
|
||||
join(agent_model.Agent).
|
||||
filter(agent_model.Agent.heartbeat_timestamp < cutoff,
|
||||
agent_model.Agent.admin_state_up))
|
||||
dhcp_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_DHCP)
|
||||
dead_bindings = [b for b in
|
||||
self._filter_bindings(context, down_bindings)]
|
||||
@ -452,9 +453,9 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
|
||||
query = query.filter(
|
||||
ndab_model.NetworkDhcpAgentBinding.network_id.in_(network_ids))
|
||||
if hosts:
|
||||
query = query.filter(agents_db.Agent.host.in_(hosts))
|
||||
query = query.filter(agent_model.Agent.host.in_(hosts))
|
||||
if admin_state_up is not None:
|
||||
query = query.filter(agents_db.Agent.admin_state_up ==
|
||||
query = query.filter(agent_model.Agent.admin_state_up ==
|
||||
admin_state_up)
|
||||
|
||||
return [binding.dhcp_agent
|
||||
|
@ -32,6 +32,7 @@ from neutron.common import _deprecate
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import agentschedulers_db
|
||||
from neutron.db.models import agent as agent_model
|
||||
from neutron.db.models import l3_attrs
|
||||
from neutron.db.models import l3agent as rb_model
|
||||
from neutron.extensions import l3agentscheduler
|
||||
@ -107,9 +108,9 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
||||
cutoff = self.get_cutoff_time(agent_dead_limit)
|
||||
return (context.session.query(
|
||||
rb_model.RouterL3AgentBinding).
|
||||
join(agents_db.Agent).
|
||||
filter(agents_db.Agent.heartbeat_timestamp < cutoff,
|
||||
agents_db.Agent.admin_state_up).outerjoin(
|
||||
join(agent_model.Agent).
|
||||
filter(agent_model.Agent.heartbeat_timestamp < cutoff,
|
||||
agent_model.Agent.admin_state_up).outerjoin(
|
||||
l3_attrs.RouterExtraAttributes,
|
||||
l3_attrs.RouterExtraAttributes.router_id ==
|
||||
rb_model.RouterL3AgentBinding.router_id).filter(
|
||||
@ -390,7 +391,7 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
||||
query = query.filter(
|
||||
rb_model.RouterL3AgentBinding.router_id.in_(router_ids))
|
||||
if admin_state_up is not None:
|
||||
query = (query.filter(agents_db.Agent.admin_state_up ==
|
||||
query = (query.filter(agent_model.Agent.admin_state_up ==
|
||||
admin_state_up))
|
||||
l3_agents = [binding.l3_agent for binding in query]
|
||||
if active is not None:
|
||||
@ -417,14 +418,14 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
||||
binding in bindings]}
|
||||
|
||||
def get_l3_agents(self, context, active=None, filters=None):
|
||||
query = context.session.query(agents_db.Agent)
|
||||
query = context.session.query(agent_model.Agent)
|
||||
query = query.filter(
|
||||
agents_db.Agent.agent_type == constants.AGENT_TYPE_L3)
|
||||
agent_model.Agent.agent_type == constants.AGENT_TYPE_L3)
|
||||
if active is not None:
|
||||
query = (query.filter(agents_db.Agent.admin_state_up == active))
|
||||
query = (query.filter(agent_model.Agent.admin_state_up == active))
|
||||
if filters:
|
||||
for key, value in six.iteritems(filters):
|
||||
column = getattr(agents_db.Agent, key, None)
|
||||
column = getattr(agent_model.Agent, key, None)
|
||||
if column:
|
||||
if not value:
|
||||
return []
|
||||
@ -434,7 +435,7 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
||||
if agent_modes:
|
||||
agent_mode_key = '\"agent_mode\": \"'
|
||||
configuration_filter = (
|
||||
[agents_db.Agent.configurations.contains('%s%s\"' %
|
||||
[agent_model.Agent.configurations.contains('%s%s\"' %
|
||||
(agent_mode_key, agent_mode))
|
||||
for agent_mode in agent_modes])
|
||||
query = query.filter(or_(*configuration_filter))
|
||||
@ -506,15 +507,15 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
||||
if not agent_ids:
|
||||
return None
|
||||
query = context.session.query(
|
||||
agents_db.Agent,
|
||||
agent_model.Agent,
|
||||
func.count(
|
||||
rb_model.RouterL3AgentBinding.router_id
|
||||
).label('count')).outerjoin(
|
||||
rb_model.RouterL3AgentBinding).group_by(
|
||||
agents_db.Agent.id,
|
||||
agent_model.Agent.id,
|
||||
rb_model.RouterL3AgentBinding
|
||||
.l3_agent_id).order_by('count')
|
||||
res = query.filter(agents_db.Agent.id.in_(agent_ids)).first()
|
||||
res = query.filter(agent_model.Agent.id.in_(agent_ids)).first()
|
||||
return res[0]
|
||||
|
||||
def get_hosts_to_notify(self, context, router_id):
|
||||
|
@ -34,12 +34,12 @@ from neutron.api.v2 import attributes
|
||||
from neutron.common import _deprecate
|
||||
from neutron.common import constants as n_const
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import api as db_api
|
||||
from neutron.db.availability_zone import router as router_az_db
|
||||
from neutron.db import common_db_mixin
|
||||
from neutron.db import l3_dvr_db
|
||||
from neutron.db.l3_dvr_db import is_distributed_router
|
||||
from neutron.db.models import agent as agent_model
|
||||
from neutron.db.models import l3 as l3_models
|
||||
from neutron.db.models import l3_attrs
|
||||
from neutron.db.models import l3ha as l3ha_model
|
||||
@ -592,8 +592,8 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
|
||||
query = context.session.query(l3ha_model.L3HARouterAgentPortBinding)
|
||||
|
||||
if host:
|
||||
query = query.join(agents_db.Agent).filter(
|
||||
agents_db.Agent.host == host)
|
||||
query = query.join(agent_model.Agent).filter(
|
||||
agent_model.Agent.host == host)
|
||||
|
||||
query = query.filter(
|
||||
l3ha_model.L3HARouterAgentPortBinding.router_id.in_(router_ids))
|
||||
|
@ -19,8 +19,8 @@ from sqlalchemy import sql
|
||||
from neutron.callbacks import events
|
||||
from neutron.callbacks import registry
|
||||
from neutron.callbacks import resources
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import l3_agentschedulers_db as l3_sch_db
|
||||
from neutron.db.models import agent as agent_model
|
||||
from neutron.db.models import l3 as l3_models
|
||||
from neutron.db.models import l3_attrs
|
||||
from neutron.db.models import l3agent as rb_model
|
||||
@ -60,12 +60,12 @@ class L3_HA_scheduler_db_mixin(l3_sch_db.AZL3AgentSchedulerDbMixin):
|
||||
def get_l3_agents_ordered_by_num_routers(self, context, agent_ids):
|
||||
if not agent_ids:
|
||||
return []
|
||||
query = (context.session.query(agents_db.Agent, func.count(
|
||||
query = (context.session.query(agent_model.Agent, func.count(
|
||||
rb_model.RouterL3AgentBinding.router_id)
|
||||
.label('count')).
|
||||
outerjoin(rb_model.RouterL3AgentBinding).
|
||||
group_by(agents_db.Agent.id).
|
||||
filter(agents_db.Agent.id.in_(agent_ids)).
|
||||
group_by(agent_model.Agent.id).
|
||||
filter(agent_model.Agent.id.in_(agent_ids)).
|
||||
order_by('count'))
|
||||
|
||||
return [record[0] for record in query]
|
||||
|
@ -26,7 +26,6 @@ import os.path
|
||||
from neutron_lib.db import model_base
|
||||
|
||||
from neutron.common import utils
|
||||
from neutron.db import agents_db # noqa
|
||||
from neutron.db import agentschedulers_db # noqa
|
||||
from neutron.db.extra_dhcp_opt import models as edo_models # noqa
|
||||
from neutron.db import l3_dvrscheduler_db # noqa
|
||||
|
59
neutron/db/models/agent.py
Normal file
59
neutron/db/models/agent.py
Normal file
@ -0,0 +1,59 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from neutron_lib.db import model_base
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import sql
|
||||
|
||||
from neutron.agent.common import utils
|
||||
from neutron.api.v2 import attributes
|
||||
|
||||
|
||||
class Agent(model_base.BASEV2, model_base.HasId):
|
||||
"""Represents agents running in neutron deployments."""
|
||||
|
||||
__table_args__ = (
|
||||
sa.UniqueConstraint('agent_type', 'host',
|
||||
name='uniq_agents0agent_type0host'),
|
||||
model_base.BASEV2.__table_args__
|
||||
)
|
||||
|
||||
# L3 agent, DHCP agent, OVS agent, LinuxBridge
|
||||
agent_type = sa.Column(sa.String(255), nullable=False)
|
||||
binary = sa.Column(sa.String(255), nullable=False)
|
||||
# TOPIC is a fanout exchange topic
|
||||
topic = sa.Column(sa.String(255), nullable=False)
|
||||
# TOPIC.host is a target topic
|
||||
host = sa.Column(sa.String(255), nullable=False)
|
||||
availability_zone = sa.Column(sa.String(255))
|
||||
admin_state_up = sa.Column(sa.Boolean, default=True,
|
||||
server_default=sql.true(), nullable=False)
|
||||
# the time when first report came from agents
|
||||
created_at = sa.Column(sa.DateTime, nullable=False)
|
||||
# the time when first report came after agents start
|
||||
started_at = sa.Column(sa.DateTime, nullable=False)
|
||||
# updated when agents report
|
||||
heartbeat_timestamp = sa.Column(sa.DateTime, nullable=False)
|
||||
# description is note for admin user
|
||||
description = sa.Column(sa.String(attributes.DESCRIPTION_MAX_LEN))
|
||||
# configurations: a json dict string, I think 4095 is enough
|
||||
configurations = sa.Column(sa.String(4095), nullable=False)
|
||||
# resource_versions: json dict, 8191 allows for ~256 resource versions
|
||||
# assuming ~32byte length "'name': 'ver',"
|
||||
# the whole row limit is 65535 bytes in mysql
|
||||
resource_versions = sa.Column(sa.String(8191))
|
||||
# load - number of resources hosted by the agent
|
||||
load = sa.Column(sa.Integer, server_default='0', nullable=False)
|
||||
|
||||
@property
|
||||
def is_active(self):
|
||||
return not utils.is_agent_down(self.heartbeat_timestamp)
|
@ -14,7 +14,7 @@ from neutron_lib.db import model_base
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import orm
|
||||
|
||||
from neutron.db import agents_db
|
||||
from neutron.db.models import agent as agent_model
|
||||
|
||||
|
||||
class NetworkDhcpAgentBinding(model_base.BASEV2):
|
||||
@ -23,7 +23,7 @@ class NetworkDhcpAgentBinding(model_base.BASEV2):
|
||||
network_id = sa.Column(sa.String(36),
|
||||
sa.ForeignKey("networks.id", ondelete='CASCADE'),
|
||||
primary_key=True)
|
||||
dhcp_agent = orm.relation(agents_db.Agent)
|
||||
dhcp_agent = orm.relation(agent_model.Agent)
|
||||
dhcp_agent_id = sa.Column(sa.String(36),
|
||||
sa.ForeignKey("agents.id",
|
||||
ondelete='CASCADE'),
|
||||
|
@ -479,7 +479,7 @@ class PortContext(object):
|
||||
"""Get agents of the specified type on port's host.
|
||||
|
||||
:param agent_type: Agent type identifier
|
||||
:returns: List of agents_db.Agent records
|
||||
:returns: List of neutron.db.models.agent.Agent records
|
||||
"""
|
||||
pass
|
||||
|
||||
|
@ -17,8 +17,8 @@ from neutron_lib import constants as const
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import l3_hamode_db
|
||||
from neutron.db.models import agent as agent_model
|
||||
from neutron.db import models_v2
|
||||
from neutron.plugins.ml2 import models as ml2_models
|
||||
|
||||
@ -57,8 +57,8 @@ def get_agent_by_host(session, agent_host):
|
||||
"""Return a L2 agent on the host."""
|
||||
|
||||
with session.begin(subtransactions=True):
|
||||
query = session.query(agents_db.Agent)
|
||||
query = query.filter(agents_db.Agent.host == agent_host)
|
||||
query = session.query(agent_model.Agent)
|
||||
query = query.filter(agent_model.Agent.host == agent_host)
|
||||
for agent in query:
|
||||
if get_agent_ip(agent):
|
||||
return agent
|
||||
@ -66,9 +66,9 @@ def get_agent_by_host(session, agent_host):
|
||||
|
||||
def _get_active_network_ports(session, network_id):
|
||||
with session.begin(subtransactions=True):
|
||||
query = session.query(ml2_models.PortBinding, agents_db.Agent)
|
||||
query = query.join(agents_db.Agent,
|
||||
agents_db.Agent.host == ml2_models.PortBinding.host)
|
||||
query = session.query(ml2_models.PortBinding, agent_model.Agent)
|
||||
query = query.join(agent_model.Agent,
|
||||
agent_model.Agent.host == ml2_models.PortBinding.host)
|
||||
query = query.join(models_v2.Port)
|
||||
query = query.filter(models_v2.Port.network_id == network_id,
|
||||
models_v2.Port.status == const.PORT_STATUS_ACTIVE)
|
||||
@ -104,9 +104,9 @@ def get_nondistributed_active_network_ports(session, network_id):
|
||||
def get_dvr_active_network_ports(session, network_id):
|
||||
with session.begin(subtransactions=True):
|
||||
query = session.query(ml2_models.DistributedPortBinding,
|
||||
agents_db.Agent)
|
||||
query = query.join(agents_db.Agent,
|
||||
agents_db.Agent.host ==
|
||||
agent_model.Agent)
|
||||
query = query.join(agent_model.Agent,
|
||||
agent_model.Agent.host ==
|
||||
ml2_models.DistributedPortBinding.host)
|
||||
query = query.join(models_v2.Port)
|
||||
query = query.filter(models_v2.Port.network_id == network_id,
|
||||
@ -128,10 +128,10 @@ def get_ha_active_network_ports(session, network_id):
|
||||
|
||||
|
||||
def get_ha_agents(session, network_id=None, router_id=None):
|
||||
query = session.query(agents_db.Agent.host).distinct()
|
||||
query = session.query(agent_model.Agent.host).distinct()
|
||||
query = query.join(l3_hamode_db.L3HARouterAgentPortBinding,
|
||||
l3_hamode_db.L3HARouterAgentPortBinding.l3_agent_id ==
|
||||
agents_db.Agent.id)
|
||||
agent_model.Agent.id)
|
||||
if router_id:
|
||||
query = query.filter(
|
||||
l3_hamode_db.L3HARouterAgentPortBinding.router_id == router_id)
|
||||
@ -145,8 +145,8 @@ def get_ha_agents(session, network_id=None, router_id=None):
|
||||
return []
|
||||
# L3HARouterAgentPortBinding will have l3 agent ids of hosting agents.
|
||||
# But we need l2 agent(for tunneling ip) while creating FDB entries.
|
||||
agents_query = session.query(agents_db.Agent)
|
||||
agents_query = agents_query.filter(agents_db.Agent.host.in_(query))
|
||||
agents_query = session.query(agent_model.Agent)
|
||||
agents_query = agents_query.filter(agent_model.Agent.host.in_(query))
|
||||
return [agent for agent in agents_query
|
||||
if get_agent_ip(agent)]
|
||||
|
||||
@ -187,6 +187,6 @@ def get_ha_router_active_port_count(session, agent_host, network_id):
|
||||
# Return num of HA router interfaces on the given network and host
|
||||
query = _ha_router_interfaces_on_network_query(session, network_id)
|
||||
query = query.filter(models_v2.Port.status == const.PORT_STATUS_ACTIVE)
|
||||
query = query.join(agents_db.Agent)
|
||||
query = query.filter(agents_db.Agent.host == agent_host)
|
||||
query = query.join(agent_model.Agent)
|
||||
query = query.filter(agent_model.Agent.host == agent_host)
|
||||
return query.count()
|
||||
|
@ -26,6 +26,7 @@ from sqlalchemy import sql
|
||||
from neutron._i18n import _LI, _LW
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import api as db_api
|
||||
from neutron.db.models import agent as agent_model
|
||||
from neutron.db.models import segment as segment_model
|
||||
from neutron.db.network_dhcp_agent_binding import models as ndab_model
|
||||
from neutron.extensions import availability_zone as az_ext
|
||||
@ -58,11 +59,11 @@ class AutoScheduler(object):
|
||||
if not net_ids:
|
||||
LOG.debug('No non-hosted networks')
|
||||
return False
|
||||
query = context.session.query(agents_db.Agent)
|
||||
query = query.filter(agents_db.Agent.agent_type ==
|
||||
constants.AGENT_TYPE_DHCP,
|
||||
agents_db.Agent.host == host,
|
||||
agents_db.Agent.admin_state_up == sql.true())
|
||||
query = context.session.query(agent_model.Agent)
|
||||
query = query.filter(
|
||||
agent_model.Agent.agent_type == constants.AGENT_TYPE_DHCP,
|
||||
agent_model.Agent.host == host,
|
||||
agent_model.Agent.admin_state_up == sql.true())
|
||||
dhcp_agents = query.all()
|
||||
|
||||
query = context.session.query(
|
||||
|
@ -23,8 +23,8 @@ from neutron.callbacks import events
|
||||
from neutron.callbacks import registry
|
||||
from neutron.callbacks import resources
|
||||
from neutron.common import utils
|
||||
from neutron.db import agents_db
|
||||
from neutron.db.agentschedulers_db import cfg
|
||||
from neutron.db.models import agent as agent_model
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
@ -57,7 +57,7 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
|
||||
self.assertEqual(expected_warnings, self.mock_log.warning.call_count)
|
||||
|
||||
def test__schedule_network(self):
|
||||
agent = agents_db.Agent()
|
||||
agent = agent_model.Agent()
|
||||
agent.admin_state_up = True
|
||||
agent.heartbeat_timestamp = timeutils.utcnow()
|
||||
network = {'id': 'foo_net_id'}
|
||||
@ -66,7 +66,7 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
|
||||
expected_casts=1, expected_warnings=0)
|
||||
|
||||
def test__schedule_network_no_existing_agents(self):
|
||||
agent = agents_db.Agent()
|
||||
agent = agent_model.Agent()
|
||||
agent.admin_state_up = True
|
||||
agent.heartbeat_timestamp = timeutils.utcnow()
|
||||
network = {'id': 'foo_net_id'}
|
||||
@ -93,20 +93,20 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
|
||||
self.assertEqual(expected_errors, self.mock_log.error.call_count)
|
||||
|
||||
def test__get_enabled_agents(self):
|
||||
agent1 = agents_db.Agent()
|
||||
agent1 = agent_model.Agent()
|
||||
agent1.admin_state_up = True
|
||||
agent1.heartbeat_timestamp = timeutils.utcnow()
|
||||
agent2 = agents_db.Agent()
|
||||
agent2 = agent_model.Agent()
|
||||
agent2.admin_state_up = False
|
||||
agent2.heartbeat_timestamp = timeutils.utcnow()
|
||||
network = {'id': 'foo_network_id'}
|
||||
self._test__get_enabled_agents(network, agents=[agent1])
|
||||
|
||||
def test__get_enabled_agents_with_inactive_ones(self):
|
||||
agent1 = agents_db.Agent()
|
||||
agent1 = agent_model.Agent()
|
||||
agent1.admin_state_up = True
|
||||
agent1.heartbeat_timestamp = timeutils.utcnow()
|
||||
agent2 = agents_db.Agent()
|
||||
agent2 = agent_model.Agent()
|
||||
agent2.admin_state_up = True
|
||||
# This is effectively an inactive agent
|
||||
agent2.heartbeat_timestamp = datetime.datetime(2000, 1, 1, 0, 0)
|
||||
@ -117,7 +117,7 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
|
||||
|
||||
def test__get_enabled_agents_with_notification_required(self):
|
||||
network = {'id': 'foo_network_id', 'subnets': ['foo_subnet_id']}
|
||||
agent = agents_db.Agent()
|
||||
agent = agent_model.Agent()
|
||||
agent.admin_state_up = False
|
||||
agent.heartbeat_timestamp = timeutils.utcnow()
|
||||
self._test__get_enabled_agents(network, [agent], port_count=20,
|
||||
@ -126,10 +126,10 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
|
||||
def test__get_enabled_agents_with_admin_state_down(self):
|
||||
cfg.CONF.set_override(
|
||||
'enable_services_on_agents_with_admin_state_down', True)
|
||||
agent1 = agents_db.Agent()
|
||||
agent1 = agent_model.Agent()
|
||||
agent1.admin_state_up = True
|
||||
agent1.heartbeat_timestamp = timeutils.utcnow()
|
||||
agent2 = agents_db.Agent()
|
||||
agent2 = agent_model.Agent()
|
||||
agent2.admin_state_up = False
|
||||
agent2.heartbeat_timestamp = timeutils.utcnow()
|
||||
network = {'id': 'foo_network_id'}
|
||||
@ -145,7 +145,7 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
|
||||
self, function, expected_scheduling=0, expected_casts=0):
|
||||
with mock.patch.object(self.notifier, '_schedule_network') as f:
|
||||
with mock.patch.object(self.notifier, '_get_enabled_agents') as g:
|
||||
agent = agents_db.Agent()
|
||||
agent = agent_model.Agent()
|
||||
agent.admin_state_up = True
|
||||
agent.heartbeat_timestamp = timeutils.utcnow()
|
||||
g.return_value = [agent]
|
||||
|
@ -28,6 +28,7 @@ import testscenarios
|
||||
from neutron import context
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import db_base_plugin_v2 as base_plugin
|
||||
from neutron.db.models import agent as agent_model
|
||||
from neutron.tests.unit import testlib_api
|
||||
|
||||
# the below code is required for the following reason
|
||||
@ -61,7 +62,7 @@ class TestAgentsDbBase(testlib_api.SqlTestCase):
|
||||
|
||||
def _get_agents(self, hosts, agent_type):
|
||||
return [
|
||||
agents_db.Agent(
|
||||
agent_model.Agent(
|
||||
binary='foo-agent',
|
||||
host=host,
|
||||
agent_type=agent_type,
|
||||
@ -359,7 +360,7 @@ class TestAgentExtRpcCallback(TestAgentsDbBase):
|
||||
|
||||
def _take_down_agent(self):
|
||||
with self.context.session.begin(subtransactions=True):
|
||||
query = self.context.session.query(agents_db.Agent)
|
||||
query = self.context.session.query(agent_model.Agent)
|
||||
agt = query.first()
|
||||
agt.heartbeat_timestamp = (
|
||||
agt.heartbeat_timestamp - datetime.timedelta(hours=1))
|
||||
|
@ -32,6 +32,7 @@ from neutron.common import constants as n_const
|
||||
from neutron import context
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import agentschedulers_db
|
||||
from neutron.db.models import agent as agent_model
|
||||
from neutron.db.models import l3agent as rb_model
|
||||
from neutron.extensions import agent
|
||||
from neutron.extensions import dhcpagentscheduler
|
||||
@ -667,7 +668,7 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
|
||||
def _take_down_agent_and_run_reschedule(self, host):
|
||||
# take down the agent on host A and ensure B is alive
|
||||
self.adminContext.session.begin(subtransactions=True)
|
||||
query = self.adminContext.session.query(agents_db.Agent)
|
||||
query = self.adminContext.session.query(agent_model.Agent)
|
||||
agt = query.filter_by(host=host).first()
|
||||
agt.heartbeat_timestamp = (
|
||||
agt.heartbeat_timestamp - datetime.timedelta(hours=1))
|
||||
@ -680,7 +681,7 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
|
||||
|
||||
def _set_agent_admin_state_up(self, host, state):
|
||||
self.adminContext.session.begin(subtransactions=True)
|
||||
query = self.adminContext.session.query(agents_db.Agent)
|
||||
query = self.adminContext.session.query(agent_model.Agent)
|
||||
agt_db = query.filter_by(host=host).first()
|
||||
agt_db.admin_state_up = state
|
||||
self.adminContext.session.commit()
|
||||
|
@ -22,6 +22,7 @@ import sqlalchemy as sa
|
||||
from sqlalchemy import orm
|
||||
import testtools
|
||||
|
||||
from neutron.agent.common import utils as agent_utils
|
||||
from neutron.api.rpc.handlers import l3_rpc
|
||||
from neutron.common import constants as n_const
|
||||
from neutron import context
|
||||
@ -200,7 +201,7 @@ class L3HATestCase(L3HATestFramework):
|
||||
router = self._create_router()
|
||||
self.plugin.update_routers_states(
|
||||
self.admin_ctx, {router['id']: 'active'}, self.agent1['host'])
|
||||
with mock.patch.object(agents_db.AgentDbMixin, 'is_agent_down',
|
||||
with mock.patch.object(agent_utils, 'is_agent_down',
|
||||
return_value=True):
|
||||
self._assert_ha_state_for_agent_is_standby(router, self.agent1)
|
||||
|
||||
|
@ -28,13 +28,13 @@ import testscenarios
|
||||
import testtools
|
||||
|
||||
from neutron import context as n_context
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import db_base_plugin_v2 as db_v2
|
||||
from neutron.db import l3_db
|
||||
from neutron.db import l3_dvr_ha_scheduler_db
|
||||
from neutron.db import l3_dvrscheduler_db
|
||||
from neutron.db import l3_hamode_db
|
||||
from neutron.db import l3_hascheduler_db
|
||||
from neutron.db.models import agent as agent_model
|
||||
from neutron.db.models import l3agent as rb_model
|
||||
from neutron.db.models import l3ha as l3ha_model
|
||||
from neutron.extensions import l3
|
||||
@ -260,7 +260,7 @@ class L3SchedulerBaseTestCase(base.BaseTestCase):
|
||||
|
||||
def _test__bind_routers_ha(self, has_binding):
|
||||
routers = [{'id': 'foo_router', 'ha': True, 'tenant_id': '42'}]
|
||||
agent = agents_db.Agent(id='foo_agent')
|
||||
agent = agent_model.Agent(id='foo_agent')
|
||||
with mock.patch.object(self.scheduler,
|
||||
'_router_has_binding',
|
||||
return_value=has_binding) as mock_has_binding,\
|
||||
@ -514,7 +514,7 @@ class L3SchedulerTestBaseMixin(object):
|
||||
|
||||
def _prepare_schedule_dvr_tests(self):
|
||||
scheduler = l3_agent_scheduler.ChanceScheduler()
|
||||
agent = agents_db.Agent()
|
||||
agent = agent_model.Agent()
|
||||
agent.admin_state_up = True
|
||||
agent.heartbeat_timestamp = timeutils.utcnow()
|
||||
plugin = mock.Mock()
|
||||
@ -1413,7 +1413,7 @@ class L3DvrSchedulerTestCase(testlib_api.SqlTestCase):
|
||||
self.assertEqual(0, len(sub_ids))
|
||||
|
||||
def _prepare_schedule_snat_tests(self):
|
||||
agent = agents_db.Agent()
|
||||
agent = agent_model.Agent()
|
||||
agent.admin_state_up = True
|
||||
agent.heartbeat_timestamp = timeutils.utcnow()
|
||||
router = {
|
||||
|
@ -17,9 +17,9 @@ from oslo_utils import uuidutils
|
||||
|
||||
from neutron.api.v2 import attributes as attr
|
||||
from neutron import context
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import api as db_api
|
||||
from neutron.db.metering import metering_rpc
|
||||
from neutron.db.models import agent as agent_model
|
||||
from neutron.extensions import l3 as ext_l3
|
||||
from neutron.extensions import metering as ext_metering
|
||||
from neutron import manager
|
||||
@ -367,8 +367,8 @@ class TestMeteringPluginL3AgentScheduler(
|
||||
'id': second_uuid}]
|
||||
|
||||
# bind each router to a specific agent
|
||||
agent1 = agents_db.Agent(host='agent1')
|
||||
agent2 = agents_db.Agent(host='agent2')
|
||||
agent1 = agent_model.Agent(host='agent1')
|
||||
agent2 = agent_model.Agent(host='agent2')
|
||||
|
||||
agents = {self.uuid: agent1,
|
||||
second_uuid: agent2}
|
||||
|
Loading…
x
Reference in New Issue
Block a user