Merge "Cisco APIC ML2 mechanism driver, part 1"

This commit is contained in:
Jenkins 2014-04-29 17:57:19 +00:00 committed by Gerrit Code Review
commit 0cfb142533
11 changed files with 1345 additions and 1 deletions
etc/neutron/plugins/ml2
neutron

@ -46,3 +46,49 @@
# ssh_port=22
# username=admin
# password=mySecretPassword
[ml2_cisco_apic]
# Hostname for the APIC controller
# apic_host=1.1.1.1
# Username for the APIC controller
# apic_username=user
# Password for the APIC controller
# apic_password=password
# Port for the APIC Controller
# apic_port=80
# Names for APIC objects used by Neutron
# Note: When deploying multiple clouds against one APIC,
# these names must be unique between the clouds.
# apic_vmm_domain=openstack
# apic_vlan_ns_name=openstack_ns
# apic_node_profile=openstack_profile
# apic_entity_profile=openstack_entity
# apic_function_profile=openstack_function
# The following flag will cause all the node profiles on the APIC to
# be cleared when neutron-server starts. This is typically used only
# for test environments that require clean-slate startup conditions.
# apic_clear_node_profiles=False
# Specify your network topology.
# This section indicates how your compute nodes are connected to the fabric's
# switches and ports. The format is as follows:
#
# [switch:<swich_id_from_the_apic>]
# <compute_host>,<compute_host>=<switchport_the_host(s)_are_connected_to>
#
# You can have multiple sections, one for each switch in your fabric that is
# participating in Openstack. e.g.
#
# [switch:17]
# ubuntu,ubuntu1=1/10
# ubuntu2,ubuntu3=1/11
#
# [switch:18]
# ubuntu5,ubuntu6=1/1
# ubuntu7,ubuntu8=1/2

@ -0,0 +1,74 @@
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Cisco APIC Mechanism Driver
Revision ID: 1b837a7125a9
Revises: 6be312499f9
Create Date: 2014-02-13 09:35:19.147619
"""
# revision identifiers, used by Alembic.
revision = '1b837a7125a9'
down_revision = '6be312499f9'
migration_for_plugins = [
'neutron.plugins.ml2.plugin.Ml2Plugin'
]
from alembic import op
import sqlalchemy as sa
from neutron.db import migration
def upgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.create_table(
'cisco_ml2_apic_epgs',
sa.Column('network_id', sa.String(length=255), nullable=False),
sa.Column('epg_id', sa.String(length=64), nullable=False),
sa.Column('segmentation_id', sa.String(length=64), nullable=False),
sa.Column('provider', sa.Boolean(), default=False, nullable=False),
sa.PrimaryKeyConstraint('network_id'))
op.create_table(
'cisco_ml2_apic_port_profiles',
sa.Column('node_id', sa.String(length=255), nullable=False),
sa.Column('profile_id', sa.String(length=64), nullable=False),
sa.Column('hpselc_id', sa.String(length=64), nullable=False),
sa.Column('module', sa.String(length=10), nullable=False),
sa.Column('from_port', sa.Integer(), nullable=False),
sa.Column('to_port', sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint('node_id'))
op.create_table(
'cisco_ml2_apic_contracts',
sa.Column('tenant_id', sa.String(length=255), nullable=False),
sa.Column('contract_id', sa.String(length=64), nullable=False),
sa.Column('filter_id', sa.String(length=64), nullable=False),
sa.PrimaryKeyConstraint('tenant_id'))
def downgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.drop_table('cisco_ml2_apic_contracts')
op.drop_table('cisco_ml2_apic_port_profiles')
op.drop_table('cisco_ml2_apic_epgs')

@ -1 +1 @@
6be312499f9
1b837a7125a9

@ -0,0 +1,416 @@
# Copyright (c) 2014 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Henry Gessau, Cisco Systems
import collections
import time
import requests
import requests.exceptions
from neutron.openstack.common import jsonutils as json
from neutron.openstack.common import log as logging
from neutron.plugins.ml2.drivers.cisco.apic import exceptions as cexc
LOG = logging.getLogger(__name__)
APIC_CODE_FORBIDDEN = str(requests.codes.forbidden)
# Info about a Managed Object's relative name (RN) and container.
class ManagedObjectName(collections.namedtuple(
'MoPath', ['container', 'rn_fmt', 'can_create'])):
def __new__(cls, container, rn_fmt, can_create=True):
return super(ManagedObjectName, cls).__new__(cls, container, rn_fmt,
can_create)
class ManagedObjectClass(object):
"""Information about a Managed Object (MO) class.
Constructs and keeps track of the distinguished name (DN) and relative
name (RN) of a managed object (MO) class. The DN is the RN of the MO
appended to the recursive RNs of its containers, i.e.:
DN = uni/container-RN/.../container-RN/object-RN
Also keeps track of whether the MO can be created in the APIC, as some
MOs are read-only or used for specifying relationships.
"""
supported_mos = {
'fvTenant': ManagedObjectName(None, 'tn-%s'),
'fvBD': ManagedObjectName('fvTenant', 'BD-%s'),
'fvRsBd': ManagedObjectName('fvAEPg', 'rsbd'),
'fvSubnet': ManagedObjectName('fvBD', 'subnet-[%s]'),
'fvCtx': ManagedObjectName('fvTenant', 'ctx-%s'),
'fvRsCtx': ManagedObjectName('fvBD', 'rsctx'),
'fvAp': ManagedObjectName('fvTenant', 'ap-%s'),
'fvAEPg': ManagedObjectName('fvAp', 'epg-%s'),
'fvRsProv': ManagedObjectName('fvAEPg', 'rsprov-%s'),
'fvRsCons': ManagedObjectName('fvAEPg', 'rscons-%s'),
'fvRsConsIf': ManagedObjectName('fvAEPg', 'rsconsif-%s'),
'fvRsDomAtt': ManagedObjectName('fvAEPg', 'rsdomAtt-[%s]'),
'fvRsPathAtt': ManagedObjectName('fvAEPg', 'rspathAtt-[%s]'),
'vzBrCP': ManagedObjectName('fvTenant', 'brc-%s'),
'vzSubj': ManagedObjectName('vzBrCP', 'subj-%s'),
'vzFilter': ManagedObjectName('fvTenant', 'flt-%s'),
'vzRsFiltAtt': ManagedObjectName('vzSubj', 'rsfiltAtt-%s'),
'vzEntry': ManagedObjectName('vzFilter', 'e-%s'),
'vzInTerm': ManagedObjectName('vzSubj', 'intmnl'),
'vzRsFiltAtt__In': ManagedObjectName('vzInTerm', 'rsfiltAtt-%s'),
'vzOutTerm': ManagedObjectName('vzSubj', 'outtmnl'),
'vzRsFiltAtt__Out': ManagedObjectName('vzOutTerm', 'rsfiltAtt-%s'),
'vzCPIf': ManagedObjectName('fvTenant', 'cif-%s'),
'vzRsIf': ManagedObjectName('vzCPIf', 'rsif'),
'vmmProvP': ManagedObjectName(None, 'vmmp-%s', False),
'vmmDomP': ManagedObjectName('vmmProvP', 'dom-%s'),
'vmmEpPD': ManagedObjectName('vmmDomP', 'eppd-[%s]'),
'physDomP': ManagedObjectName(None, 'phys-%s'),
'infra': ManagedObjectName(None, 'infra'),
'infraNodeP': ManagedObjectName('infra', 'nprof-%s'),
'infraLeafS': ManagedObjectName('infraNodeP', 'leaves-%s-typ-%s'),
'infraNodeBlk': ManagedObjectName('infraLeafS', 'nodeblk-%s'),
'infraRsAccPortP': ManagedObjectName('infraNodeP', 'rsaccPortP-[%s]'),
'infraAccPortP': ManagedObjectName('infra', 'accportprof-%s'),
'infraHPortS': ManagedObjectName('infraAccPortP', 'hports-%s-typ-%s'),
'infraPortBlk': ManagedObjectName('infraHPortS', 'portblk-%s'),
'infraRsAccBaseGrp': ManagedObjectName('infraHPortS', 'rsaccBaseGrp'),
'infraFuncP': ManagedObjectName('infra', 'funcprof'),
'infraAccPortGrp': ManagedObjectName('infraFuncP', 'accportgrp-%s'),
'infraRsAttEntP': ManagedObjectName('infraAccPortGrp', 'rsattEntP'),
'infraAttEntityP': ManagedObjectName('infra', 'attentp-%s'),
'infraRsDomP': ManagedObjectName('infraAttEntityP', 'rsdomP-[%s]'),
'infraRsVlanNs__phys': ManagedObjectName('physDomP', 'rsvlanNs'),
'infraRsVlanNs__vmm': ManagedObjectName('vmmDomP', 'rsvlanNs'),
'fvnsVlanInstP': ManagedObjectName('infra', 'vlanns-%s-%s'),
'fvnsEncapBlk__vlan': ManagedObjectName('fvnsVlanInstP',
'from-%s-to-%s'),
'fvnsVxlanInstP': ManagedObjectName('infra', 'vxlanns-%s'),
'fvnsEncapBlk__vxlan': ManagedObjectName('fvnsVxlanInstP',
'from-%s-to-%s'),
# Read-only
'fabricTopology': ManagedObjectName(None, 'topology', False),
'fabricPod': ManagedObjectName('fabricTopology', 'pod-%s', False),
'fabricPathEpCont': ManagedObjectName('fabricPod', 'paths-%s', False),
'fabricPathEp': ManagedObjectName('fabricPathEpCont', 'pathep-%s',
False),
}
# Note(Henry): The use of a mutable default argument _inst_cache is
# intentional. It persists for the life of MoClass to cache instances.
# noinspection PyDefaultArgument
def __new__(cls, mo_class, _inst_cache={}):
"""Ensure we create only one instance per mo_class."""
try:
return _inst_cache[mo_class]
except KeyError:
new_inst = super(ManagedObjectClass, cls).__new__(cls)
new_inst.__init__(mo_class)
_inst_cache[mo_class] = new_inst
return new_inst
def __init__(self, mo_class):
self.klass = mo_class
self.klass_name = mo_class.split('__')[0]
mo = self.supported_mos[mo_class]
self.container = mo.container
self.rn_fmt = mo.rn_fmt
self.dn_fmt, self.args = self._dn_fmt()
self.arg_count = self.dn_fmt.count('%s')
rn_has_arg = self.rn_fmt.count('%s')
self.can_create = rn_has_arg and mo.can_create
def _dn_fmt(self):
"""Build the distinguished name format using container and RN.
DN = uni/container-RN/.../container-RN/object-RN
Also make a list of the required name arguments.
Note: Call this method only once at init.
"""
arg = [self.klass] if '%s' in self.rn_fmt else []
if self.container:
container = ManagedObjectClass(self.container)
dn_fmt = '%s/%s' % (container.dn_fmt, self.rn_fmt)
args = container.args + arg
return dn_fmt, args
return 'uni/%s' % self.rn_fmt, arg
def dn(self, *args):
"""Return the distinguished name for a managed object."""
return self.dn_fmt % args
class ApicSession(object):
"""Manages a session with the APIC."""
def __init__(self, host, port, usr, pwd, ssl):
protocol = ssl and 'https' or 'http'
self.api_base = '%s://%s:%s/api' % (protocol, host, port)
self.session = requests.Session()
self.session_deadline = 0
self.session_timeout = 0
self.cookie = {}
# Log in
self.authentication = None
self.username = None
self.password = None
if usr and pwd:
self.login(usr, pwd)
@staticmethod
def _make_data(key, **attrs):
"""Build the body for a msg out of a key and some attributes."""
return json.dumps({key: {'attributes': attrs}})
def _api_url(self, api):
"""Create the URL for a generic API."""
return '%s/%s.json' % (self.api_base, api)
def _mo_url(self, mo, *args):
"""Create a URL for a MO lookup by DN."""
dn = mo.dn(*args)
return '%s/mo/%s.json' % (self.api_base, dn)
def _qry_url(self, mo):
"""Create a URL for a query lookup by MO class."""
return '%s/class/%s.json' % (self.api_base, mo.klass_name)
def _check_session(self):
"""Check that we are logged in and ensure the session is active."""
if not self.authentication:
raise cexc.ApicSessionNotLoggedIn
if time.time() > self.session_deadline:
self.refresh()
def _send(self, request, url, data=None, refreshed=None):
"""Send a request and process the response."""
if data is None:
response = request(url, cookies=self.cookie)
else:
response = request(url, data=data, cookies=self.cookie)
if response is None:
raise cexc.ApicHostNoResponse(url=url)
# Every request refreshes the timeout
self.session_deadline = time.time() + self.session_timeout
if data is None:
request_str = url
else:
request_str = '%s, data=%s' % (url, data)
LOG.debug(_("data = %s"), data)
# imdata is where the APIC returns the useful information
imdata = response.json().get('imdata')
LOG.debug(_("Response: %s"), imdata)
if response.status_code != requests.codes.ok:
try:
err_code = imdata[0]['error']['attributes']['code']
err_text = imdata[0]['error']['attributes']['text']
except (IndexError, KeyError):
err_code = '[code for APIC error not found]'
err_text = '[text for APIC error not found]'
# If invalid token then re-login and retry once
if (not refreshed and err_code == APIC_CODE_FORBIDDEN and
err_text.lower().startswith('token was invalid')):
self.login()
return self._send(request, url, data=data, refreshed=True)
raise cexc.ApicResponseNotOk(request=request_str,
status=response.status_code,
reason=response.reason,
err_text=err_text, err_code=err_code)
return imdata
# REST requests
def get_data(self, request):
"""Retrieve generic data from the server."""
self._check_session()
url = self._api_url(request)
return self._send(self.session.get, url)
def get_mo(self, mo, *args):
"""Retrieve a managed object by its distinguished name."""
self._check_session()
url = self._mo_url(mo, *args) + '?query-target=self'
return self._send(self.session.get, url)
def list_mo(self, mo):
"""Retrieve the list of managed objects for a class."""
self._check_session()
url = self._qry_url(mo)
return self._send(self.session.get, url)
def post_data(self, request, data):
"""Post generic data to the server."""
self._check_session()
url = self._api_url(request)
return self._send(self.session.post, url, data=data)
def post_mo(self, mo, *args, **kwargs):
"""Post data for a managed object to the server."""
self._check_session()
url = self._mo_url(mo, *args)
data = self._make_data(mo.klass_name, **kwargs)
return self._send(self.session.post, url, data=data)
# Session management
def _save_cookie(self, request, response):
"""Save the session cookie and its expiration time."""
imdata = response.json().get('imdata')
if response.status_code == requests.codes.ok:
attributes = imdata[0]['aaaLogin']['attributes']
try:
self.cookie = {'APIC-Cookie': attributes['token']}
except KeyError:
raise cexc.ApicResponseNoCookie(request=request)
timeout = int(attributes['refreshTimeoutSeconds'])
LOG.debug(_("APIC session will expire in %d seconds"), timeout)
# Give ourselves a few seconds to refresh before timing out
self.session_timeout = timeout - 5
self.session_deadline = time.time() + self.session_timeout
else:
attributes = imdata[0]['error']['attributes']
return attributes
def login(self, usr=None, pwd=None):
"""Log in to controller. Save user name and authentication."""
usr = usr or self.username
pwd = pwd or self.password
name_pwd = self._make_data('aaaUser', name=usr, pwd=pwd)
url = self._api_url('aaaLogin')
try:
response = self.session.post(url, data=name_pwd, timeout=10.0)
except requests.exceptions.Timeout:
raise cexc.ApicHostNoResponse(url=url)
attributes = self._save_cookie('aaaLogin', response)
if response.status_code == requests.codes.ok:
self.username = usr
self.password = pwd
self.authentication = attributes
else:
self.authentication = None
raise cexc.ApicResponseNotOk(request=url,
status=response.status_code,
reason=response.reason,
err_text=attributes['text'],
err_code=attributes['code'])
def refresh(self):
"""Called when a session has timed out or almost timed out."""
url = self._api_url('aaaRefresh')
response = self.session.get(url, cookies=self.cookie)
attributes = self._save_cookie('aaaRefresh', response)
if response.status_code == requests.codes.ok:
# We refreshed before the session timed out.
self.authentication = attributes
else:
err_code = attributes['code']
err_text = attributes['text']
if (err_code == APIC_CODE_FORBIDDEN and
err_text.lower().startswith('token was invalid')):
# This means the token timed out, so log in again.
LOG.debug(_("APIC session timed-out, logging in again."))
self.login()
else:
self.authentication = None
raise cexc.ApicResponseNotOk(request=url,
status=response.status_code,
reason=response.reason,
err_text=err_text,
err_code=err_code)
def logout(self):
"""End session with controller."""
if not self.username:
self.authentication = None
if self.authentication:
data = self._make_data('aaaUser', name=self.username)
self.post_data('aaaLogout', data=data)
self.authentication = None
class ManagedObjectAccess(object):
"""CRUD operations on APIC Managed Objects."""
def __init__(self, session, mo_class):
self.session = session
self.mo = ManagedObjectClass(mo_class)
def _create_container(self, *args):
"""Recursively create all container objects."""
if self.mo.container:
container = ManagedObjectAccess(self.session, self.mo.container)
if container.mo.can_create:
container_args = args[0: container.mo.arg_count]
container._create_container(*container_args)
container.session.post_mo(container.mo, *container_args)
def create(self, *args, **kwargs):
self._create_container(*args)
if self.mo.can_create and 'status' not in kwargs:
kwargs['status'] = 'created'
return self.session.post_mo(self.mo, *args, **kwargs)
def _mo_attributes(self, obj_data):
if (self.mo.klass_name in obj_data and
'attributes' in obj_data[self.mo.klass_name]):
return obj_data[self.mo.klass_name]['attributes']
def get(self, *args):
"""Return a dict of the MO's attributes, or None."""
imdata = self.session.get_mo(self.mo, *args)
if imdata:
return self._mo_attributes(imdata[0])
def list_all(self):
imdata = self.session.list_mo(self.mo)
return filter(None, [self._mo_attributes(obj) for obj in imdata])
def list_names(self):
return [obj['name'] for obj in self.list_all()]
def update(self, *args, **kwargs):
return self.session.post_mo(self.mo, *args, **kwargs)
def delete(self, *args):
return self.session.post_mo(self.mo, *args, status='deleted')
class RestClient(ApicSession):
"""APIC REST client for OpenStack Neutron."""
def __init__(self, host, port=80, usr=None, pwd=None, ssl=False):
"""Establish a session with the APIC."""
super(RestClient, self).__init__(host, port, usr, pwd, ssl)
def __getattr__(self, mo_class):
"""Add supported MOs as properties on demand."""
if mo_class not in ManagedObjectClass.supported_mos:
raise cexc.ApicManagedObjectNotSupported(mo_class=mo_class)
self.__dict__[mo_class] = ManagedObjectAccess(self, mo_class)
return self.__dict__[mo_class]

@ -0,0 +1,177 @@
# Copyright (c) 2014 Cisco Systems Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Arvind Somya (asomya@cisco.com), Cisco Systems Inc.
import sqlalchemy as sa
from neutron.db import api as db_api
from neutron.db import model_base
from neutron.db import models_v2
class NetworkEPG(model_base.BASEV2):
"""EPG's created on the apic per network."""
__tablename__ = 'cisco_ml2_apic_epgs'
network_id = sa.Column(sa.String(255), nullable=False, primary_key=True)
epg_id = sa.Column(sa.String(64), nullable=False)
segmentation_id = sa.Column(sa.String(64), nullable=False)
provider = sa.Column(sa.Boolean, default=False, nullable=False)
class PortProfile(model_base.BASEV2):
"""Port profiles created on the APIC."""
__tablename__ = 'cisco_ml2_apic_port_profiles'
node_id = sa.Column(sa.String(255), nullable=False, primary_key=True)
profile_id = sa.Column(sa.String(64), nullable=False)
hpselc_id = sa.Column(sa.String(64), nullable=False)
module = sa.Column(sa.String(10), nullable=False)
from_port = sa.Column(sa.Integer(), nullable=False)
to_port = sa.Column(sa.Integer(), nullable=False)
class TenantContract(model_base.BASEV2, models_v2.HasTenant):
"""Contracts (and Filters) created on the APIC."""
__tablename__ = 'cisco_ml2_apic_contracts'
__table_args__ = (sa.PrimaryKeyConstraint('tenant_id'),)
contract_id = sa.Column(sa.String(64), nullable=False)
filter_id = sa.Column(sa.String(64), nullable=False)
class ApicDbModel(object):
"""DB Model to manage all APIC DB interactions."""
def __init__(self):
self.session = db_api.get_session()
def get_port_profile_for_node(self, node_id):
"""Returns a port profile for a switch if found in the DB."""
return self.session.query(PortProfile).filter_by(
node_id=node_id).first()
def get_profile_for_module_and_ports(self, node_id, profile_id,
module, from_port, to_port):
"""Returns profile for module and ports.
Grabs the profile row from the DB for the specified switch,
module (linecard) and from/to port combination.
"""
return self.session.query(PortProfile).filter_by(
node_id=node_id,
module=module,
profile_id=profile_id,
from_port=from_port,
to_port=to_port).first()
def get_profile_for_module(self, node_id, profile_id, module):
"""Returns the first profile for a switch module from the DB."""
return self.session.query(PortProfile).filter_by(
node_id=node_id,
profile_id=profile_id,
module=module).first()
def add_profile_for_module_and_ports(self, node_id, profile_id,
hpselc_id, module,
from_port, to_port):
"""Adds a profile for switch, module and port range."""
row = PortProfile(node_id=node_id, profile_id=profile_id,
hpselc_id=hpselc_id, module=module,
from_port=from_port, to_port=to_port)
self.session.add(row)
self.session.flush()
def get_provider_contract(self):
"""Returns provider EPG from the DB if found."""
return self.session.query(NetworkEPG).filter_by(
provider=True).first()
def set_provider_contract(self, epg_id):
"""Sets an EPG to be a contract provider."""
epg = self.session.query(NetworkEPG).filter_by(
epg_id=epg_id).first()
if epg:
epg.provider = True
self.session.merge(epg)
self.session.flush()
def unset_provider_contract(self, epg_id):
"""Sets an EPG to be a contract consumer."""
epg = self.session.query(NetworkEPG).filter_by(
epg_id=epg_id).first()
if epg:
epg.provider = False
self.session.merge(epg)
self.session.flush()
def get_an_epg(self, exception):
"""Returns an EPG from the DB that does not match the id specified."""
return self.session.query(NetworkEPG).filter(
NetworkEPG.epg_id != exception).first()
def get_epg_for_network(self, network_id):
"""Returns an EPG for a give neutron network."""
return self.session.query(NetworkEPG).filter_by(
network_id=network_id).first()
def write_epg_for_network(self, network_id, epg_uid, segmentation_id='1'):
"""Stores EPG details for a network.
NOTE: Segmentation_id is just a placeholder currently, it will be
populated with a proper segment id once segmentation mgmt is
moved to the APIC.
"""
epg = NetworkEPG(network_id=network_id, epg_id=epg_uid,
segmentation_id=segmentation_id)
self.session.add(epg)
self.session.flush()
return epg
def delete_epg(self, epg):
"""Deletes an EPG from the DB."""
self.session.delete(epg)
self.session.flush()
def get_contract_for_tenant(self, tenant_id):
"""Returns the specified tenant's contract."""
return self.session.query(TenantContract).filter_by(
tenant_id=tenant_id).first()
def write_contract_for_tenant(self, tenant_id, contract_id, filter_id):
"""Stores a new contract for the given tenant."""
contract = TenantContract(tenant_id=tenant_id,
contract_id=contract_id,
filter_id=filter_id)
self.session.add(contract)
self.session.flush()
return contract
def delete_profile_for_node(self, node_id):
"""Deletes the port profile for a node."""
profile = self.session.query(PortProfile).filter_by(
node_id=node_id).first()
if profile:
self.session.delete(profile)
self.session.flush()

@ -0,0 +1,82 @@
# Copyright (c) 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Arvind Somya (asomya@cisco.com), Cisco Systems Inc.
from oslo.config import cfg
apic_opts = [
cfg.StrOpt('apic_host',
help=_("Host name or IP Address of the APIC controller")),
cfg.StrOpt('apic_username',
help=_("Username for the APIC controller")),
cfg.StrOpt('apic_password',
help=_("Password for the APIC controller"), secret=True),
cfg.StrOpt('apic_port',
help=_("Communication port for the APIC controller")),
cfg.StrOpt('apic_vmm_provider', default='VMware',
help=_("Name for the VMM domain provider")),
cfg.StrOpt('apic_vmm_domain', default='openstack',
help=_("Name for the VMM domain to be created for Openstack")),
cfg.StrOpt('apic_vlan_ns_name', default='openstack_ns',
help=_("Name for the vlan namespace to be used for openstack")),
cfg.StrOpt('apic_vlan_range', default='2:4093',
help=_("Range of VLAN's to be used for Openstack")),
cfg.StrOpt('apic_node_profile', default='openstack_profile',
help=_("Name of the node profile to be created")),
cfg.StrOpt('apic_entity_profile', default='openstack_entity',
help=_("Name of the entity profile to be created")),
cfg.StrOpt('apic_function_profile', default='openstack_function',
help=_("Name of the function profile to be created")),
cfg.BoolOpt('apic_clear_node_profiles', default=False,
help=_("Clear the node profiles on the APIC at startup "
"(mainly used for testing)")),
]
cfg.CONF.register_opts(apic_opts, "ml2_cisco_apic")
def get_switch_and_port_for_host(host_id):
for switch, connected in _switch_dict.items():
for port, hosts in connected.items():
if host_id in hosts:
return switch, port
_switch_dict = {}
def create_switch_dictionary():
multi_parser = cfg.MultiConfigParser()
read_ok = multi_parser.read(cfg.CONF.config_file)
if len(read_ok) != len(cfg.CONF.config_file):
raise cfg.Error(_("Some config files were not parsed properly"))
for parsed_file in multi_parser.parsed:
for parsed_item in parsed_file.keys():
if parsed_item.startswith('apic_switch'):
switch, switch_id = parsed_item.split(':')
if switch.lower() == 'apic_switch':
_switch_dict[switch_id] = {}
port_cfg = parsed_file[parsed_item].items()
for host_list, port in port_cfg:
hosts = host_list.split(',')
port = port[0]
_switch_dict[switch_id][port] = hosts
return _switch_dict

@ -0,0 +1,52 @@
# Copyright (c) 2014 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Henry Gessau, Cisco Systems
"""Exceptions used by Cisco APIC ML2 mechanism driver."""
from neutron.common import exceptions
class ApicHostNoResponse(exceptions.NotFound):
"""No response from the APIC via the specified URL."""
message = _("No response from APIC at %(url)s")
class ApicResponseNotOk(exceptions.NeutronException):
"""A response from the APIC was not HTTP OK."""
message = _("APIC responded with HTTP status %(status)s: %(reason)s, "
"Request: '%(request)s', "
"APIC error code %(err_code)s: %(err_text)s")
class ApicResponseNoCookie(exceptions.NeutronException):
"""A response from the APIC did not contain an expected cookie."""
message = _("APIC failed to provide cookie for %(request)s request")
class ApicSessionNotLoggedIn(exceptions.NotAuthorized):
"""Attempted APIC operation while not logged in to APIC."""
message = _("Authorized APIC session not established")
class ApicHostNotConfigured(exceptions.NotAuthorized):
"""The switch and port for the specified host are not configured."""
message = _("The switch and port for host '%(host)s' are not configured")
class ApicManagedObjectNotSupported(exceptions.NeutronException):
"""Attempted to use an unsupported Managed Object."""
message = _("Managed Object '%(mo_class)s' is not supported")

@ -0,0 +1,272 @@
# Copyright (c) 2014 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Henry Gessau, Cisco Systems
import mock
import requests
import requests.exceptions
from neutron.plugins.ml2.drivers.cisco.apic import apic_client as apic
from neutron.plugins.ml2.drivers.cisco.apic import exceptions as cexc
from neutron.tests import base
from neutron.tests.unit.ml2.drivers.cisco.apic import (
test_cisco_apic_common as mocked)
class TestCiscoApicClient(base.BaseTestCase, mocked.ControllerMixin):
def setUp(self):
super(TestCiscoApicClient, self).setUp()
self.set_up_mocks()
self.apic = apic.RestClient(mocked.APIC_HOST)
self.addCleanup(mock.patch.stopall)
def _mock_authenticate(self, timeout=300):
self.reset_reponses()
self.mock_apic_manager_login_responses(timeout=timeout)
self.apic.login(mocked.APIC_USR, mocked.APIC_PWD)
def test_login_by_instantiation(self):
self.reset_reponses()
self.mock_apic_manager_login_responses()
apic2 = apic.RestClient(mocked.APIC_HOST,
usr=mocked.APIC_USR, pwd=mocked.APIC_PWD)
self.assertIsNotNone(apic2.authentication)
self.assertEqual(apic2.username, mocked.APIC_USR)
def test_client_session_login_ok(self):
self._mock_authenticate()
self.assertEqual(
self.apic.authentication['userName'], mocked.APIC_USR)
self.assertTrue(self.apic.api_base.startswith('http://'))
self.assertEqual(self.apic.username, mocked.APIC_USR)
self.assertIsNotNone(self.apic.authentication)
self.apic = apic.RestClient(mocked.APIC_HOST, mocked.APIC_PORT,
ssl=True)
self.assertTrue(self.apic.api_base.startswith('https://'))
def test_client_session_login_fail(self):
self.mock_error_post_response(requests.codes.unauthorized,
code='599',
text=u'Fake error')
self.assertRaises(cexc.ApicResponseNotOk, self.apic.login,
mocked.APIC_USR, mocked.APIC_PWD)
def test_client_session_login_timeout(self):
self.response['post'].append(requests.exceptions.Timeout)
self.assertRaises(cexc.ApicHostNoResponse, self.apic.login,
mocked.APIC_USR, mocked.APIC_PWD)
def test_client_session_logout_ok(self):
self.mock_response_for_post('aaaLogout')
self.apic.logout()
self.assertIsNone(self.apic.authentication)
# Multiple signouts should not cause an error
self.apic.logout()
self.assertIsNone(self.apic.authentication)
def test_client_session_logout_fail(self):
self._mock_authenticate()
self.mock_error_post_response(requests.codes.timeout,
code='123', text='failed')
self.assertRaises(cexc.ApicResponseNotOk, self.apic.logout)
def test_query_not_logged_in(self):
self.apic.authentication = None
self.assertRaises(cexc.ApicSessionNotLoggedIn,
self.apic.fvTenant.get, mocked.APIC_TENANT)
def test_query_no_response(self):
self._mock_authenticate()
requests.Session.get = mock.Mock(return_value=None)
self.assertRaises(cexc.ApicHostNoResponse,
self.apic.fvTenant.get, mocked.APIC_TENANT)
def test_query_error_response_no_data(self):
self._mock_authenticate()
self.mock_error_get_response(requests.codes.bad) # No error attrs.
self.assertRaises(cexc.ApicResponseNotOk,
self.apic.fvTenant.get, mocked.APIC_TENANT)
def test_generic_get_data(self):
self._mock_authenticate()
self.mock_response_for_get('topSystem', name='ifc1')
top_system = self.apic.get_data('class/topSystem')
self.assertIsNotNone(top_system)
name = top_system[0]['topSystem']['attributes']['name']
self.assertEqual(name, 'ifc1')
def test_session_timeout_refresh_ok(self):
self._mock_authenticate(timeout=-1)
# Client will do refresh before getting tenant
self.mock_response_for_get('aaaLogin', token='ok',
refreshTimeoutSeconds=300)
self.mock_response_for_get('fvTenant', name=mocked.APIC_TENANT)
tenant = self.apic.fvTenant.get(mocked.APIC_TENANT)
self.assertEqual(tenant['name'], mocked.APIC_TENANT)
def test_session_timeout_refresh_no_cookie(self):
self._mock_authenticate(timeout=-1)
# Client will do refresh before getting tenant
self.mock_response_for_get('aaaLogin', notoken='test')
self.assertRaises(cexc.ApicResponseNoCookie,
self.apic.fvTenant.get, mocked.APIC_TENANT)
def test_session_timeout_refresh_error(self):
self._mock_authenticate(timeout=-1)
self.mock_error_get_response(requests.codes.timeout,
code='503', text=u'timed out')
self.assertRaises(cexc.ApicResponseNotOk,
self.apic.fvTenant.get, mocked.APIC_TENANT)
def test_session_timeout_refresh_timeout_error(self):
self._mock_authenticate(timeout=-1)
# Client will try to get refresh, we fake a refresh error.
self.mock_error_get_response(requests.codes.bad_request,
code='403',
text=u'Token was invalid. Expired.')
# Client will then try to re-login.
self.mock_apic_manager_login_responses()
# Finally the client will try to get the tenant.
self.mock_response_for_get('fvTenant', name=mocked.APIC_TENANT)
tenant = self.apic.fvTenant.get(mocked.APIC_TENANT)
self.assertEqual(tenant['name'], mocked.APIC_TENANT)
def test_lookup_mo_bad_token_retry(self):
self._mock_authenticate()
# For the first get request we mock a bad token.
self.mock_error_get_response(requests.codes.bad_request,
code='403',
text=u'Token was invalid. Expired.')
# Client will then try to re-login.
self.mock_apic_manager_login_responses()
# Then the client will retry to get the tenant.
self.mock_response_for_get('fvTenant', name=mocked.APIC_TENANT)
tenant = self.apic.fvTenant.get(mocked.APIC_TENANT)
self.assertEqual(tenant['name'], mocked.APIC_TENANT)
def test_use_unsupported_managed_object(self):
self._mock_authenticate()
# unittest.assertRaises cannot catch exceptions raised in
# __getattr__, so we need to defer the evaluation using lambda.
self.assertRaises(cexc.ApicManagedObjectNotSupported,
lambda: self.apic.nonexistentObject)
def test_lookup_nonexistant_mo(self):
self._mock_authenticate()
self.mock_response_for_get('fvTenant')
self.assertIsNone(self.apic.fvTenant.get(mocked.APIC_TENANT))
def test_lookup_existing_mo(self):
self._mock_authenticate()
self.mock_response_for_get('fvTenant', name='infra')
tenant = self.apic.fvTenant.get('infra')
self.assertEqual(tenant['name'], 'infra')
def test_list_mos_ok(self):
self._mock_authenticate()
self.mock_response_for_get('fvTenant', name='t1')
self.mock_append_to_response('fvTenant', name='t2')
tlist = self.apic.fvTenant.list_all()
self.assertIsNotNone(tlist)
self.assertEqual(len(tlist), 2)
self.assertIn({'name': 't1'}, tlist)
self.assertIn({'name': 't2'}, tlist)
def test_list_mo_names_ok(self):
self._mock_authenticate()
self.mock_response_for_get('fvTenant', name='t1')
self.mock_append_to_response('fvTenant', name='t2')
tnlist = self.apic.fvTenant.list_names()
self.assertIsNotNone(tnlist)
self.assertEqual(len(tnlist), 2)
self.assertIn('t1', tnlist)
self.assertIn('t2', tnlist)
def test_list_mos_split_class_fail(self):
self._mock_authenticate()
self.mock_response_for_get('fvnsEncapBlk', name='Blk1')
encap_blks = self.apic.fvnsEncapBlk__vlan.list_all()
self.assertEqual(len(encap_blks), 1)
def test_delete_mo_ok(self):
self._mock_authenticate()
self.mock_response_for_post('fvTenant')
self.assertTrue(self.apic.fvTenant.delete(mocked.APIC_TENANT))
def test_create_mo_ok(self):
self._mock_authenticate()
self.mock_response_for_post('fvTenant', name=mocked.APIC_TENANT)
self.mock_response_for_get('fvTenant', name=mocked.APIC_TENANT)
self.apic.fvTenant.create(mocked.APIC_TENANT)
tenant = self.apic.fvTenant.get(mocked.APIC_TENANT)
self.assertEqual(tenant['name'], mocked.APIC_TENANT)
def test_create_mo_already_exists(self):
self._mock_authenticate()
self.mock_error_post_response(requests.codes.bad_request,
code='103',
text=u'Fake 103 error')
self.assertRaises(cexc.ApicResponseNotOk,
self.apic.vmmProvP.create, mocked.APIC_VMMP)
def test_create_mo_with_prereq(self):
self._mock_authenticate()
self.mock_response_for_post('fvTenant', name=mocked.APIC_TENANT)
self.mock_response_for_post('fvBD', name=mocked.APIC_NETWORK)
self.mock_response_for_get('fvBD', name=mocked.APIC_NETWORK)
bd_args = mocked.APIC_TENANT, mocked.APIC_NETWORK
self.apic.fvBD.create(*bd_args)
network = self.apic.fvBD.get(*bd_args)
self.assertEqual(network['name'], mocked.APIC_NETWORK)
def test_create_mo_prereq_exists(self):
self._mock_authenticate()
self.mock_response_for_post('vmmDomP', name=mocked.APIC_DOMAIN)
self.mock_response_for_get('vmmDomP', name=mocked.APIC_DOMAIN)
self.apic.vmmDomP.create(mocked.APIC_VMMP, mocked.APIC_DOMAIN)
dom = self.apic.vmmDomP.get(mocked.APIC_VMMP, mocked.APIC_DOMAIN)
self.assertEqual(dom['name'], mocked.APIC_DOMAIN)
def test_create_mo_fails(self):
self._mock_authenticate()
self.mock_response_for_post('fvTenant', name=mocked.APIC_TENANT)
self.mock_error_post_response(requests.codes.bad_request,
code='not103',
text=u'Fake not103 error')
bd_args = mocked.APIC_TENANT, mocked.APIC_NETWORK
self.assertRaises(cexc.ApicResponseNotOk,
self.apic.fvBD.create, *bd_args)
def test_update_mo(self):
self._mock_authenticate()
self.mock_response_for_post('fvTenant', name=mocked.APIC_TENANT)
self.mock_response_for_get('fvTenant', name=mocked.APIC_TENANT,
more='extra')
self.apic.fvTenant.update(mocked.APIC_TENANT, more='extra')
tenant = self.apic.fvTenant.get(mocked.APIC_TENANT)
self.assertEqual(tenant['name'], mocked.APIC_TENANT)
self.assertEqual(tenant['more'], 'extra')
def test_attr_fail_empty_list(self):
self._mock_authenticate()
self.mock_response_for_get('fvTenant') # No attrs for tenant.
self.assertIsNone(self.apic.fvTenant.get(mocked.APIC_TENANT))
def test_attr_fail_other_obj(self):
self._mock_authenticate()
self.mock_response_for_get('other', name=mocked.APIC_TENANT)
self.assertIsNone(self.apic.fvTenant.get(mocked.APIC_TENANT))

@ -0,0 +1,225 @@
# Copyright (c) 2014 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Henry Gessau, Cisco Systems
import mock
import requests
from oslo.config import cfg
from neutron.common import config as neutron_config
from neutron.plugins.ml2 import config as ml2_config
from neutron.plugins.ml2.drivers.cisco.apic import apic_client as apic
from neutron.tests.unit import test_api_v2
OK = requests.codes.ok
APIC_HOST = 'fake.controller.local'
APIC_PORT = 7580
APIC_USR = 'notadmin'
APIC_PWD = 'topsecret'
APIC_TENANT = 'citizen14'
APIC_NETWORK = 'network99'
APIC_NETNAME = 'net99name'
APIC_SUBNET = '10.3.2.1/24'
APIC_L3CTX = 'layer3context'
APIC_AP = 'appProfile001'
APIC_EPG = 'endPointGroup001'
APIC_CONTRACT = 'signedContract'
APIC_SUBJECT = 'testSubject'
APIC_FILTER = 'carbonFilter'
APIC_ENTRY = 'forcedEntry'
APIC_VMMP = 'OpenStack'
APIC_DOMAIN = 'cumuloNimbus'
APIC_PDOM = 'rainStorm'
APIC_NODE_PROF = 'red'
APIC_LEAF = 'green'
APIC_LEAF_TYPE = 'range'
APIC_NODE_BLK = 'blue'
APIC_PORT_PROF = 'yellow'
APIC_PORT_SEL = 'front'
APIC_PORT_TYPE = 'range'
APIC_PORT_BLK1 = 'block01'
APIC_PORT_BLK2 = 'block02'
APIC_ACC_PORT_GRP = 'alpha'
APIC_FUNC_PROF = 'beta'
APIC_ATT_ENT_PROF = 'delta'
APIC_VLAN_NAME = 'gamma'
APIC_VLAN_MODE = 'dynamic'
APIC_VLANID_FROM = 2900
APIC_VLANID_TO = 2999
APIC_VLAN_FROM = 'vlan-%d' % APIC_VLANID_FROM
APIC_VLAN_TO = 'vlan-%d' % APIC_VLANID_TO
class ControllerMixin(object):
"""Mock the controller for APIC driver and service unit tests."""
def __init__(self):
self.response = None
def set_up_mocks(self):
# The mocked responses from the server are lists used by
# mock.side_effect, which means each call to post or get will
# return the next item in the list. This allows the test cases
# to stage a sequence of responses to method(s) under test.
self.response = {'post': [], 'get': []}
self.reset_reponses()
def reset_reponses(self, req=None):
# Clear all staged responses.
reqs = req and [req] or ['post', 'get'] # Both if none specified.
for req in reqs:
del self.response[req][:]
self.restart_responses(req)
def restart_responses(self, req):
responses = mock.MagicMock(side_effect=self.response[req])
if req == 'post':
requests.Session.post = responses
elif req == 'get':
requests.Session.get = responses
def mock_response_for_post(self, mo, **attrs):
attrs['debug_mo'] = mo # useful for debugging
self._stage_mocked_response('post', OK, mo, **attrs)
def mock_response_for_get(self, mo, **attrs):
self._stage_mocked_response('get', OK, mo, **attrs)
def mock_append_to_response(self, mo, **attrs):
# Append a MO to the last get response.
mo_attrs = attrs and {mo: {'attributes': attrs}} or {}
self.response['get'][-1].json.return_value['imdata'].append(mo_attrs)
def mock_error_post_response(self, status, **attrs):
self._stage_mocked_response('post', status, 'error', **attrs)
def mock_error_get_response(self, status, **attrs):
self._stage_mocked_response('get', status, 'error', **attrs)
def _stage_mocked_response(self, req, mock_status, mo, **attrs):
response = mock.MagicMock()
response.status_code = mock_status
mo_attrs = attrs and [{mo: {'attributes': attrs}}] or []
response.json.return_value = {'imdata': mo_attrs}
self.response[req].append(response)
def mock_responses_for_create(self, obj):
self._mock_container_responses_for_create(
apic.ManagedObjectClass(obj).container)
name = '-'.join([obj, 'name']) # useful for debugging
self._stage_mocked_response('post', OK, obj, name=name)
def _mock_container_responses_for_create(self, obj):
# Recursively generate responses for creating obj's containers.
if obj:
mo = apic.ManagedObjectClass(obj)
if mo.can_create:
if mo.container:
self._mock_container_responses_for_create(mo.container)
name = '-'.join([obj, 'name']) # useful for debugging
self._stage_mocked_response('post', OK, obj, debug_name=name)
def mock_apic_manager_login_responses(self, timeout=300):
# APIC Manager tests are based on authenticated session
self.mock_response_for_post('aaaLogin', userName=APIC_USR,
token='ok', refreshTimeoutSeconds=timeout)
def assert_responses_drained(self, req=None):
"""Fail if all the expected responses have not been consumed."""
request = {'post': self.session.post, 'get': self.session.get}
reqs = req and [req] or ['post', 'get'] # Both if none specified.
for req in reqs:
try:
request[req]('some url')
except StopIteration:
pass
else:
# User-friendly error message
msg = req + ' response queue not drained'
self.fail(msg=msg)
class ConfigMixin(object):
"""Mock the config for APIC driver and service unit tests."""
def __init__(self):
self.mocked_parser = None
def set_up_mocks(self):
# Mock the configuration file
args = ['--config-file', test_api_v2.etcdir('neutron.conf.test')]
neutron_config.parse(args=args)
# Configure the ML2 mechanism drivers and network types
ml2_opts = {
'mechanism_drivers': ['apic'],
'tenant_network_types': ['vlan'],
}
for opt, val in ml2_opts.items():
ml2_config.cfg.CONF.set_override(opt, val, 'ml2')
# Configure the Cisco APIC mechanism driver
apic_test_config = {
'apic_host': APIC_HOST,
'apic_username': APIC_USR,
'apic_password': APIC_PWD,
'apic_port': APIC_PORT,
'apic_vmm_domain': APIC_DOMAIN,
'apic_vlan_ns_name': APIC_VLAN_NAME,
'apic_vlan_range': '%d:%d' % (APIC_VLANID_FROM, APIC_VLANID_TO),
'apic_node_profile': APIC_NODE_PROF,
'apic_entity_profile': APIC_ATT_ENT_PROF,
'apic_function_profile': APIC_FUNC_PROF,
}
for opt, val in apic_test_config.items():
cfg.CONF.set_override(opt, val, 'ml2_cisco_apic')
apic_switch_cfg = {
'apic_switch:east01': {'ubuntu1,ubuntu2': ['3/11']},
'apic_switch:east02': {'rhel01,rhel02': ['4/21'],
'rhel03': ['4/22']},
}
self.mocked_parser = mock.patch.object(cfg,
'MultiConfigParser').start()
self.mocked_parser.return_value.read.return_value = [apic_switch_cfg]
self.mocked_parser.return_value.parsed = [apic_switch_cfg]
class DbModelMixin(object):
"""Mock the DB models for the APIC driver and service unit tests."""
def __init__(self):
self.mocked_session = None
def set_up_mocks(self):
self.mocked_session = mock.Mock()
get_session = mock.patch('neutron.db.api.get_session').start()
get_session.return_value = self.mocked_session
def mock_db_query_filterby_first_return(self, value):
"""Mock db.session.query().filterby().first() to return value."""
query = self.mocked_session.query.return_value
query.filter_by.return_value.first.return_value = value