Merge "Optimize the nova vm name query inside get_gbp_details()"

This commit is contained in:
Zuul 2019-02-01 20:49:32 +00:00 committed by Gerrit Code Review
commit 7177c6f8e4
10 changed files with 288 additions and 13 deletions

View File

@ -0,0 +1,46 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""VM names acquired from Nova API
Revision ID: 4f70135dc089
Revises: 4967af35820f
Create Date: 2019-01-08 14:18:11.909757
"""
# revision identifiers, used by Alembic.
revision = '4f70135dc089'
down_revision = '4967af35820f'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'apic_aim_vm_names',
sa.Column('device_id', sa.String(36), nullable=False),
sa.PrimaryKeyConstraint('device_id'),
sa.Column('vm_name', sa.String(64), nullable=False),
)
op.create_table(
'apic_aim_vm_name_updates',
sa.Column('host_id', sa.String(36), nullable=False),
sa.PrimaryKeyConstraint('host_id'),
sa.Column('last_incremental_update_time', sa.DateTime()),
sa.Column('last_full_update_time', sa.DateTime()),
)
def downgrade():
pass

View File

@ -1 +1 @@
4967af35820f
4f70135dc089

View File

@ -57,6 +57,10 @@ apic_opts = [
default=False,
help=("This will use those raw SQL statements to speed "
"up the calculation of the EP file.")),
cfg.IntOpt('apic_nova_vm_name_cache_update_interval', default=60,
help=("How many seconds for the polling thread on each "
"controller should wait before it updates the nova vm "
"name cache again.")),
]

View File

@ -80,6 +80,24 @@ class NetworkMapping(model_base.BASEV2):
vrf_tenant_name = sa.Column(sa.String(64))
class VMName(model_base.BASEV2):
__tablename__ = 'apic_aim_vm_names'
device_id = sa.Column(sa.String(36), primary_key=True)
vm_name = sa.Column(sa.String(64))
# At any point of time, there should only be one entry in this table.
# That entry is used to make sure only one controller is actively updating
# the VMName table.
class VMNameUpdate(model_base.BASEV2):
__tablename__ = 'apic_aim_vm_name_updates'
host_id = sa.Column(sa.String(36), primary_key=True)
last_incremental_update_time = sa.Column(sa.DateTime)
last_full_update_time = sa.Column(sa.DateTime)
class DbMixin(object):
def _add_address_scope_mapping(self, session, scope_id, vrf,
vrf_owned=True, update_scope=True):
@ -299,3 +317,61 @@ class DbMixin(object):
def _set_network_vrf(self, mapping, vrf):
mapping.vrf_tenant_name = vrf.tenant_name
mapping.vrf_name = vrf.name
def _get_vm_name(self, session, device_id, is_detailed=False):
if is_detailed:
query = BAKERY(lambda s: s.query(VMName))
else:
query = BAKERY(lambda s: s.query(VMName.vm_name))
query += lambda q: q.filter_by(
device_id=sa.bindparam('device_id'))
return query(session).params(
device_id=device_id).one_or_none()
def _get_vm_names(self, session):
query = BAKERY(lambda s: s.query(VMName.device_id,
VMName.vm_name))
return query(session).all()
def _set_vm_name(self, session, device_id, vm_name):
with session.begin(subtransactions=True):
db_obj = self._get_vm_name(session, device_id,
is_detailed=True)
if db_obj:
db_obj.vm_name = vm_name
else:
db_obj = VMName(device_id=device_id, vm_name=vm_name)
session.add(db_obj)
def _delete_vm_name(self, session, device_id):
with session.begin(subtransactions=True):
db_obj = self._get_vm_name(session, device_id,
is_detailed=True)
if db_obj:
session.delete(db_obj)
def _get_vm_name_update(self, session):
query = BAKERY(lambda s: s.query(VMNameUpdate))
return query(session).one_or_none()
def _set_vm_name_update(self, session, db_obj, host_id,
last_incremental_update_time,
last_full_update_time=None):
with session.begin(subtransactions=True):
if db_obj:
db_obj.host_id = host_id
db_obj.last_incremental_update_time = (
last_incremental_update_time)
if last_full_update_time:
db_obj.last_full_update_time = last_full_update_time
else:
db_obj = VMNameUpdate(host_id=host_id,
last_incremental_update_time=last_incremental_update_time,
last_full_update_time=last_full_update_time)
session.add(db_obj)
def _delete_vm_name_update(self, session):
with session.begin(subtransactions=True):
db_obj = self._get_vm_name_update(session)
if db_obj:
session.delete(db_obj)

View File

@ -16,6 +16,7 @@
from collections import defaultdict
from collections import namedtuple
import copy
from datetime import datetime
import netaddr
import os
import re
@ -60,6 +61,7 @@ from neutron_lib import context as nctx
from neutron_lib import exceptions as n_exceptions
from neutron_lib.plugins import directory
from neutron_lib.plugins.ml2 import api
from neutron_lib.utils import net
from opflexagent import constants as ofcst
from opflexagent import host_agent_rpc as arpc
from opflexagent import rpc as ofrpc
@ -67,6 +69,7 @@ from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log
import oslo_messaging
from oslo_service import loopingcall
from oslo_utils import importutils
from gbpservice.network.neutronv2 import local_api
@ -82,6 +85,8 @@ from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import db
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import exceptions
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import extension_db
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import trunk_driver
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
nova_client as nclient)
LOG = log.getLogger(__name__)
@ -258,6 +263,9 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
self.l3_domain_dn = cfg.CONF.ml2_apic_aim.l3_domain_dn
self.enable_raw_sql_for_device_rpc = (cfg.CONF.ml2_apic_aim.
enable_raw_sql_for_device_rpc)
self.apic_nova_vm_name_cache_update_interval = (cfg.CONF.ml2_apic_aim.
apic_nova_vm_name_cache_update_interval)
self._setup_nova_vm_update()
local_api.QUEUE_OUT_OF_PROCESS_NOTIFICATIONS = True
self._ensure_static_resources()
trunk_driver.register()
@ -266,6 +274,76 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
self.apic_router_id_pool = cfg.CONF.ml2_apic_aim.apic_router_id_pool
self.apic_router_id_subnet = netaddr.IPSet([self.apic_router_id_pool])
def _setup_nova_vm_update(self):
self.admin_context = nctx.get_admin_context()
self.host_id = 'id-%s' % net.get_hostname()
vm_update = loopingcall.FixedIntervalLoopingCall(
self._update_nova_vm_name_cache)
vm_update.start(
interval=self.apic_nova_vm_name_cache_update_interval)
def _update_nova_vm_name_cache(self):
current_time = datetime.now()
session = self.admin_context.session
vm_name_update = self._get_vm_name_update(session)
is_full_update = True
if vm_name_update:
# The other controller is still doing the update actively
if vm_name_update.host_id != self.host_id:
delta_time = (current_time -
vm_name_update.last_incremental_update_time)
if (delta_time.total_seconds() <
self.apic_nova_vm_name_cache_update_interval * 2):
return
else:
delta_time = (current_time -
vm_name_update.last_full_update_time)
if (delta_time.total_seconds() <
self.apic_nova_vm_name_cache_update_interval * 10):
is_full_update = False
self._set_vm_name_update(session, vm_name_update, self.host_id,
current_time,
current_time if is_full_update else None)
nova_vms = nclient.NovaClient().get_servers(
is_full_update, self.apic_nova_vm_name_cache_update_interval * 10)
vm_list = []
for vm in nova_vms:
vm_list.append((vm.id, vm.name))
nova_vms = set(vm_list)
with db_api.context_manager.writer.using(self.admin_context):
cached_vms = self._get_vm_names(session)
cached_vms = set(cached_vms)
# Only handle the deletion during full update otherwise we
# don't know if the missing VMs are being deleted or just older
# than 10 minutes as incremental update only queries Nova for
# the past 10 mins.
if is_full_update:
removed_vms = cached_vms - nova_vms
for device_id, _ in removed_vms:
self._delete_vm_name(session, device_id)
added_vms = nova_vms - cached_vms
update_ports = []
for device_id, name in added_vms:
self._set_vm_name(session, device_id, name)
# Get the port_id for this device_id
query = BAKERY(lambda s: s.query(
models_v2.Port.id))
query += lambda q: q.filter(
models_v2.Port.device_id == sa.bindparam('device_id'))
port = query(session).params(
device_id=device_id).one_or_none()
if port:
port_id, = port
update_ports.append(port_id)
if update_ports:
self._notify_port_update_bulk(self.admin_context, update_ports)
def _query_used_apic_router_ids(self, aim_ctx):
used_ids = netaddr.IPSet()
# Find the l3out_nodes created by us

View File

@ -1927,6 +1927,10 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
subnet['dhcp_server_ports'] = dhcp_ports
return subnets
def _get_nova_vm_name(self, context, port):
return self.aim_mech_driver._get_vm_name(context.session,
port['device_id'])
def _send_port_update_notification(self, plugin_context, port):
self.aim_mech_driver._notify_port_update(plugin_context, port)

View File

@ -27,8 +27,6 @@ from oslo_log import log
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import (
mechanism_driver as md)
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
nova_client as nclient)
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
port_ha_ipaddress_binding as ha_ip_db)
@ -234,11 +232,7 @@ class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
# Put per mac-address extra info
'extra_details': {}}
# Set VM name if needed.
if port['device_owner'].startswith(
'compute:') and port['device_id']:
vm = nclient.NovaClient().get_server(port['device_id'])
details['vm-name'] = vm.name if vm else port['device_id']
self._set_nova_vm_name(context, port, details)
details['_cache'] = {}
mtu = self._get_port_mtu(context, port, details)
@ -612,11 +606,7 @@ class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
# Put per mac-address extra info
'extra_details': {}}
# Set VM name if needed.
if port['device_owner'].startswith(
'compute:') and port['device_id']:
vm = nclient.NovaClient().get_server(port['device_id'])
details['vm-name'] = vm.name if vm else port['device_id']
self._set_nova_vm_name(context, port, details)
details['_cache'] = {}
self._build_up_details_cache(
@ -698,6 +688,17 @@ class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
LOG.debug("Details for port %s : %s", port['id'], details)
return details
def _set_nova_vm_name(self, context, port, details):
# Set VM name if needed.
if port['device_owner'].startswith(
'compute:') and port['device_id']:
vm = self._get_nova_vm_name(context, port)
if vm:
vm_name, = vm
else:
vm_name = port['device_id']
details['vm-name'] = vm_name
def _get_owned_addresses(self, plugin_context, port_id):
return set(self.ha_ip_handler.get_ha_ipaddresses_for_port(port_id))

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from neutron.notifiers import nova as n_nova
from novaclient import exceptions as nova_exceptions
from oslo_log import log as logging
@ -41,3 +42,15 @@ class NovaClient(object):
server_id)
except Exception as e:
LOG.exception(e)
def get_servers(self, is_full_update, changes_since_in_sec):
if is_full_update:
search_opts = {'all_tenants': 1}
else:
seconds_ago = (datetime.datetime.now() -
datetime.timedelta(seconds=changes_since_in_sec))
search_opts = {'all_tenants': 1,
'changes-since': str(seconds_ago),
'deleted': 'false'}
return self.client.servers.list(detailed=False,
search_opts=search_opts)

View File

@ -14,6 +14,7 @@
# under the License.
import copy
import datetime
import fixtures
import mock
import netaddr
@ -241,6 +242,10 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
def setUp(self, mechanism_drivers=None, tenant_network_types=None,
plugin=None, ext_mgr=None):
self.nova_client = mock.patch(
'gbpservice.neutron.services.grouppolicy.drivers.cisco.'
'apic.nova_client.NovaClient.get_servers').start()
self.nova_client.return_value = []
# Enable the test mechanism driver to ensure that
# we can successfully call through to all mechanism
# driver apis.
@ -1731,6 +1736,50 @@ class TestAimMapping(ApicAimTestCase):
mock.call(mock.ANY, tenant)]
self._check_call_list(exp_calls, self.driver.aim.delete.call_args_list)
def test_update_nova_vm_name_cache(self):
# VM cache is empty to being with
self.assertEqual(self.driver._get_vm_names(self.db_session),
[])
# Add one vm
vm = mock.Mock()
vm.id = 'some_id'
vm.name = 'some_name'
self.nova_client.return_value = [vm]
self.driver._update_nova_vm_name_cache()
self.assertEqual(self.driver._get_vm_names(self.db_session),
[('some_id', 'some_name')])
# Update vm name
vm.name = 'new_name'
self.nova_client.return_value = [vm]
self.driver._update_nova_vm_name_cache()
self.assertEqual(self.driver._get_vm_names(self.db_session),
[('some_id', 'new_name')])
# Simulate the polling thread from the other controller
# will just stand by
old_id = self.driver.host_id
self.driver.host_id = 'new_id'
vm.name = 'old_name'
self.nova_client.return_value = [vm]
self.driver._update_nova_vm_name_cache()
self.assertEqual(self.driver._get_vm_names(self.db_session),
[('some_id', 'new_name')])
# VM removal won't be triggered thru incremental update
self.driver.host_id = old_id
self.nova_client.return_value = []
self.driver._update_nova_vm_name_cache()
self.assertEqual(self.driver._get_vm_names(self.db_session),
[('some_id', 'new_name')])
# Force a full update which will take care of the VM removal
vm_update_obj = self.driver._get_vm_name_update(self.db_session)
new_full_update_time = (vm_update_obj.last_full_update_time -
datetime.timedelta(minutes=11))
self.driver._set_vm_name_update(self.db_session, vm_update_obj,
old_id, new_full_update_time, new_full_update_time)
self.nova_client.return_value = []
self.driver._update_nova_vm_name_cache()
self.assertEqual(self.driver._get_vm_names(self.db_session),
[])
def test_multi_scope_routing_with_unscoped_pools(self):
self._test_multi_scope_routing(True)

View File

@ -120,6 +120,10 @@ class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase,
def setUp(self, policy_drivers=None, core_plugin=None, ml2_options=None,
l3_plugin=None, sc_plugin=None, trunk_plugin=None,
qos_plugin=None, **kwargs):
self.nova_client1 = mock.patch(
'gbpservice.neutron.services.grouppolicy.drivers.cisco.'
'apic.nova_client.NovaClient.get_servers').start()
self.nova_client1.return_value = []
core_plugin = core_plugin or ML2PLUS_PLUGIN
if not l3_plugin:
l3_plugin = "apic_aim_l3"