Switch to flake8 from pep8.

* flake8 supports more checks than pep8 (e.g. detection of
   unused imports and variables), and has an extension mechanism.
   A plugin to support automatic HACKING validation is planned.
 * See: http://flake8.readthedocs.org/

Change-Id: I8c9314c606802109a4d01908dbc74ecb792ad0ac
This commit is contained in:
Maru Newby 2013-04-04 04:10:23 +00:00
parent 55f2bc05a0
commit a9e763914d
112 changed files with 180 additions and 383 deletions
quantum
agent
api
common
db
plugins
scheduler
tests/unit

@ -129,7 +129,7 @@ class DhcpAgent(manager.Manager):
getattr(driver, action)()
return True
except Exception, e:
except Exception:
self.needs_resync = True
LOG.exception(_('Unable to %s dhcp.'), action)
@ -718,8 +718,7 @@ class DhcpAgentWithStateReport(DhcpAgent):
LOG.info(_("DHCP agent started"))
def main():
eventlet.monkey_patch()
def register_options():
cfg.CONF.register_opts(DhcpAgent.OPTS)
config.register_agent_state_opts_helper(cfg.CONF)
config.register_root_helper(cfg.CONF)
@ -727,6 +726,11 @@ def main():
cfg.CONF.register_opts(DhcpLeaseRelay.OPTS)
cfg.CONF.register_opts(dhcp.OPTS)
cfg.CONF.register_opts(interface.OPTS)
def main():
eventlet.monkey_patch()
register_options()
cfg.CONF(project='quantum')
config.setup_logging(cfg.CONF)
server = quantum_service.Service.create(

@ -31,7 +31,7 @@ class Pidfile(object):
def __init__(self, pidfile, procname, root_helper='sudo'):
try:
self.fd = os.open(pidfile, os.O_CREAT | os.O_RDWR)
except IOError, e:
except IOError:
LOG.exception(_("Failed to open pidfile: %s"), pidfile)
sys.exit(1)
self.pidfile = pidfile
@ -68,7 +68,7 @@ class Pidfile(object):
cmd = ['cat', '/proc/%s/cmdline' % pid]
try:
return self.procname in utils.execute(cmd, self.root_helper)
except RuntimeError, e:
except RuntimeError:
return False
@ -91,7 +91,7 @@ class Daemon(object):
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
except OSError:
LOG.exception(_('Fork failed'))
sys.exit(1)

@ -163,9 +163,9 @@ class DhcpLocalProcess(DhcpBase):
with open(file_name, 'r') as f:
try:
return converter and converter(f.read()) or f.read()
except ValueError, e:
except ValueError:
msg = _('Unable to convert value in %s')
except IOError, e:
except IOError:
msg = _('Unable to access %s')
LOG.debug(msg % file_name)
@ -185,7 +185,7 @@ class DhcpLocalProcess(DhcpBase):
cmd = ['cat', '/proc/%s/cmdline' % pid]
try:
return self.network.id in utils.execute(cmd, self.root_helper)
except RuntimeError, e:
except RuntimeError:
return False
@property

@ -86,9 +86,9 @@ class ProcessManager(object):
try:
with open(file_name, 'r') as f:
return int(f.read())
except IOError, e:
except IOError:
msg = _('Unable to access %s')
except ValueError, e:
except ValueError:
msg = _('Unable to convert value in %s')
LOG.debug(msg, file_name)
@ -103,5 +103,5 @@ class ProcessManager(object):
cmd = ['cat', '/proc/%s/cmdline' % pid]
try:
return self.uuid in utils.execute(cmd, self.root_helper)
except RuntimeError, e:
except RuntimeError:
return False

@ -24,8 +24,6 @@
import inspect
import os
from oslo.config import cfg
from quantum.agent.linux import utils
from quantum.openstack.common import lockutils
from quantum.openstack.common import log as logging

@ -89,7 +89,7 @@ class MetadataProxyHandler(object):
else:
return webob.exc.HTTPNotFound()
except Exception, e:
except Exception:
LOG.exception(_("Unexpected error."))
msg = _('An unknown error has occurred. '
'Please try your request again.')

@ -77,7 +77,7 @@ class NetworkMetadataProxyHandler(object):
return self._proxy_request(req.remote_addr,
req.path_info,
req.query_string)
except Exception, e:
except Exception:
LOG.exception(_("Unexpected error."))
msg = _('An unknown error has occurred. '
'Please try your request again.')

@ -139,7 +139,7 @@ def destroy_namespace(conf, namespace, force=False):
unplug_device(conf, device)
ip.garbage_collect_namespace()
except Exception, e:
except Exception:
LOG.exception(_('Error unable to destroy namespace: %s'), namespace)

@ -15,17 +15,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from quantum.common import topics
from quantum.openstack.common import log as logging
from quantum.openstack.common.notifier import api
from quantum.openstack.common.notifier import rpc_notifier
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
from quantum.openstack.common import timeutils
from quantum.openstack.common import uuidutils
LOG = logging.getLogger(__name__)

@ -27,7 +27,6 @@ import webob.dec
import webob.exc
from quantum.api.v2 import attributes
from quantum.common import constants
from quantum.common import exceptions
import quantum.extensions
from quantum.manager import QuantumManager

@ -25,7 +25,6 @@ from oslo.config import cfg
from paste import deploy
from quantum.api.v2 import attributes
from quantum.common import constants
from quantum.common import utils
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
@ -116,7 +115,6 @@ def setup_logging(conf):
"""
product_name = "quantum"
logging.setup(product_name)
log_root = logging.getLogger(product_name).logger
LOG.info(_("Logging enabled!"))

@ -20,7 +20,7 @@ Quantum base exception handling.
"""
from quantum.openstack.common.exception import Error
from quantum.openstack.common.exception import InvalidContentType
from quantum.openstack.common.exception import InvalidContentType # noqa
from quantum.openstack.common.exception import OpenstackException

@ -15,17 +15,13 @@
# under the License.
#
from oslo.config import cfg
import sqlalchemy as sa
from sqlalchemy import exc as sa_exc
from sqlalchemy import orm
from sqlalchemy.orm import exc
from sqlalchemy.sql import expression as expr
import webob.exc as w_exc
from quantum.api.v2 import attributes
from quantum.common import exceptions as q_exc
from quantum.db import db_base_plugin_v2
from quantum.db import model_base
from quantum.db import models_v2
from quantum.extensions import loadbalancer
@ -34,7 +30,6 @@ from quantum import manager
from quantum.openstack.common import log as logging
from quantum.openstack.common import uuidutils
from quantum.plugins.common import constants
from quantum import policy
LOG = logging.getLogger(__name__)
@ -663,10 +658,9 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
tenant_id = self._get_tenant_id_for_create(context, v)
with context.session.begin(subtransactions=True):
pool = None
try:
qry = context.session.query(Pool)
pool = qry.filter_by(id=v['pool_id']).one()
qry.filter_by(id=v['pool_id']).one()
except exc.NoResultFound:
raise loadbalancer.PoolNotFound(pool_id=v['pool_id'])

@ -35,7 +35,6 @@ migration_for_plugins = [
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
from quantum.db import migration

@ -40,7 +40,6 @@ migration_for_plugins = [
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
from quantum.db import migration

@ -31,11 +31,6 @@ down_revision = '1341ed32cc1e'
migration_for_plugins = ['*']
from alembic import op
import sqlalchemy as sa
from quantum.db import migration
def upgrade(active_plugin=None, options=None):
"""A no-op migration for marking the Grizzly release."""

@ -17,14 +17,12 @@
# @author: Mark McClain, DreamHost
import os
import sys
from alembic import command as alembic_command
from alembic import config as alembic_config
from alembic import util as alembic_util
from oslo.config import cfg
from quantum import manager
_core_opts = [
cfg.StrOpt('core_plugin',

@ -18,7 +18,6 @@
import sqlalchemy as sa
from sqlalchemy import event
from sqlalchemy.orm import exc
from quantum.common import exceptions as qexception
from quantum.db import model_base

@ -17,7 +17,6 @@
# @author: Kaiwei Fan, VMware, Inc
import sqlalchemy as sa
from sqlalchemy.orm import exc
from quantum.db import model_base
from quantum.extensions import routerservicetype as rst

@ -747,7 +747,7 @@ class QuantumRestProxyV2(db_base_plugin_v2.QuantumDbPluginV2,
# update network on network controller
try:
self._send_update_network(orig_net)
except RemoteRestError as e:
except RemoteRestError:
# rollback creation of subnet
super(QuantumRestProxyV2, self).delete_subnet(context,
subnet['id'])
@ -770,7 +770,7 @@ class QuantumRestProxyV2(db_base_plugin_v2.QuantumDbPluginV2,
# update network on network controller
try:
self._send_update_network(orig_net)
except RemoteRestError as e:
except RemoteRestError:
# rollback updation of subnet
super(QuantumRestProxyV2, self).update_subnet(context, id,
orig_subnet)
@ -788,7 +788,7 @@ class QuantumRestProxyV2(db_base_plugin_v2.QuantumDbPluginV2,
# update network on network controller
try:
self._send_update_network(orig_net)
except RemoteRestError as e:
except RemoteRestError:
# TODO (Sumit): rollback deletion of subnet
raise
@ -1014,7 +1014,7 @@ class QuantumRestProxyV2(db_base_plugin_v2.QuantumDbPluginV2,
# update network on network controller
try:
self._send_update_network(orig_net)
except RemoteRestError as e:
except RemoteRestError:
# rollback updation of subnet
super(QuantumRestProxyV2, self).update_floatingip(context, id,
orig_fl_ip)
@ -1035,7 +1035,7 @@ class QuantumRestProxyV2(db_base_plugin_v2.QuantumDbPluginV2,
# update network on network controller
try:
self._send_update_network(orig_net)
except RemoteRestError as e:
except RemoteRestError:
# TODO(Sumit): rollback deletion of floating IP
raise

@ -85,12 +85,6 @@ class TestNetworkCtrl(object):
assert id_ <= len(self.matches), 'remove_id: id > len()'
self.matches.pop(id_)
def remove_match(self, prior, method_regexp, uri_regexp):
for i in self.matches:
if (i[0], i[1], i[2]) == (method_regexp, uri_regexp, idstr):
self.remove_id(i)
break
def request_handler(self, method, uri, body):
retstatus = self.default_status
retbody = self.default_response

@ -40,7 +40,6 @@ from quantum.db import agentschedulers_db
from quantum.db import api as db
from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base
from quantum.db import l3_db
from quantum.db import l3_rpc_base
from quantum.db import securitygroups_rpc_base as sg_db_rpc
from quantum.extensions import portbindings

@ -18,8 +18,6 @@
import logging as LOG
from oslo.config import cfg
from quantum.plugins.cisco.common import config
from quantum.plugins.cisco.common import cisco_constants as const
from quantum.plugins.cisco.common import cisco_exceptions as cexc
@ -54,7 +52,7 @@ class Store(object):
@staticmethod
def put_credential(cred_name, username, password):
"""Set the username and password"""
credential = cdb.add_credential(TENANT, cred_name, username, password)
cdb.add_credential(TENANT, cred_name, username, password)
@staticmethod
def get_username(cred_name):
@ -71,7 +69,7 @@ class Store(object):
@staticmethod
def get_credential(cred_name):
"""Get the username and password"""
credential = cdb.get_credential_name(TENANT, cred_name)
cdb.get_credential_name(TENANT, cred_name)
return {const.USERNAME: const.CREDENTIAL_USERNAME,
const.PASSWORD: const.CREDENTIAL_PASSWORD}

@ -20,7 +20,6 @@ import hashlib
import logging
from quantum.plugins.cisco.common import cisco_constants as const
from quantum.plugins.cisco.db import l2network_db as cdb
LOG = logging.getLogger(__name__)

@ -102,7 +102,7 @@ def network_id(net_name):
options(joinedload(models.Network.ports)).
filter_by(name=net_name).
all())
except exc.NoResultFound, e:
except exc.NoResultFound:
raise q_exc.NetworkNotFound(net_name=net_name)
@ -113,7 +113,7 @@ def network_get(net_id):
options(joinedload(models.Network.ports)). \
filter_by(uuid=net_id).\
one()
except exc.NoResultFound, e:
except exc.NoResultFound:
raise q_exc.NetworkNotFound(net_id=net_id)
@ -147,7 +147,7 @@ def validate_network_ownership(tenant_id, net_id):
filter_by(uuid=net_id).
filter_by(tenant_id=tenant_id).
one())
except exc.NoResultFound, e:
except exc.NoResultFound:
raise q_exc.NetworkNotFound(net_id=net_id)

@ -15,11 +15,10 @@
# under the License.
# @author: Rohit Agarwalla, Cisco Systems, Inc.
from sqlalchemy import Column, Integer, String, ForeignKey, Boolean
from sqlalchemy.orm import relation, object_mapper
from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.orm import object_mapper
from quantum.openstack.common import uuidutils
from quantum.plugins.cisco.db import models
from quantum.plugins.cisco.db.models import BASE

@ -24,7 +24,7 @@ from quantum.openstack.common import log as logging
from quantum.plugins.cisco.common import cisco_exceptions as c_exc
from quantum.plugins.cisco.common import config
from quantum.plugins.cisco.db import network_models_v2
from quantum.plugins.cisco.db import nexus_models_v2
from quantum.plugins.cisco.db import nexus_models_v2 # noqa
from quantum.plugins.openvswitch import ovs_models_v2

@ -16,11 +16,11 @@
#
# @author: Rohit Agarwalla, Cisco Systems, Inc.
from sqlalchemy import Column, Integer, String, ForeignKey, Boolean
from sqlalchemy.orm import relation, object_mapper
from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.orm import object_mapper
from quantum.db import model_base
from quantum.db import models_v2 as models
from quantum.db import models_v2 as models # noqa
from quantum.openstack.common import uuidutils

@ -19,14 +19,12 @@
# @author: Rohit Agarwalla, Cisco Systems, Inc.
#
from copy import deepcopy
import inspect
import logging
from novaclient.v1_1 import client as nova_client
from oslo.config import cfg
from quantum.db import l3_db
from quantum.manager import QuantumManager
from quantum.openstack.common import importutils
from quantum.plugins.cisco.common import cisco_constants as const
@ -229,9 +227,9 @@ class VirtualPhysicalSwitchModelV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
args = [ovs_output[0]['tenant_id'], id, {'vlan_id': vlan_id},
{'net_admin_state': ovs_output[0]['admin_state_up']},
{'vlan_ids': vlanids}]
nexus_output = self._invoke_plugin_per_device(const.NEXUS_PLUGIN,
self._func_name(),
args)
self._invoke_plugin_per_device(const.NEXUS_PLUGIN,
self._func_name(),
args)
return ovs_output[0]
def delete_network(self, context, id):
@ -300,11 +298,10 @@ class VirtualPhysicalSwitchModelV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
instance_id = port['port']['device_id']
device_owner = port['port']['device_owner']
if conf.CISCO_TEST.host is not None:
host = conf.CISCO_TEST.host
elif device_owner == 'network:dhcp':
return ovs_output[0]
elif instance_id:
create_net = (conf.CISCO_TEST.host is None and
device_owner != 'network:dhcp' and
instance_id)
if create_net:
net_id = port['port']['network_id']
tenant_id = port['port']['tenant_id']
self._invoke_nexus_for_net_create(
@ -368,9 +365,9 @@ class VirtualPhysicalSwitchModelV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
ovs_output = self._invoke_plugin_per_device(const.VSWITCH_PLUGIN,
self._func_name(),
args)
nexus_output = self._invoke_plugin_per_device(const.NEXUS_PLUGIN,
self._func_name(),
n_args)
self._invoke_plugin_per_device(const.NEXUS_PLUGIN,
self._func_name(),
n_args)
return ovs_output[0]
except:
# TODO (asomya): Check if we need to perform any rollback here

@ -27,8 +27,7 @@ from quantum.db import models_v2
from quantum.openstack.common import importutils
from quantum.plugins.cisco.common import cisco_constants as const
from quantum.plugins.cisco.common import cisco_exceptions as cexc
from quantum.plugins.cisco.common import cisco_utils as cutil
from quantum.plugins.cisco.common import config
from quantum.plugins.cisco.common import config # noqa
from quantum.plugins.cisco.db import network_db_v2 as cdb
LOG = logging.getLogger(__name__)
@ -300,7 +299,7 @@ class PluginV2(db_base_plugin_v2.QuantumDbPluginV2):
"""Delete a QoS level"""
LOG.debug(_("delete_qos() called"))
try:
qos_level = cdb.get_qos(tenant_id, qos_id)
cdb.get_qos(tenant_id, qos_id)
except Exception:
raise cexc.QosNotFound(tenant_id=tenant_id,
qos_id=qos_id)
@ -310,7 +309,7 @@ class PluginV2(db_base_plugin_v2.QuantumDbPluginV2):
"""Rename QoS level"""
LOG.debug(_("rename_qos() called"))
try:
qos_level = cdb.get_qos(tenant_id, qos_id)
cdb.get_qos(tenant_id, qos_id)
except Exception:
raise cexc.QosNotFound(tenant_id=tenant_id,
qos_id=qos_id)

@ -21,7 +21,6 @@
Implements a Nexus-OS NETCONF over SSHv2 API Client
"""
import eventlet
import logging
from ncclient import manager

@ -30,7 +30,6 @@ from quantum.common import exceptions as exc
from quantum.openstack.common import importutils
from quantum.plugins.cisco.common import cisco_constants as const
from quantum.plugins.cisco.common import cisco_credentials_v2 as cred
from quantum.plugins.cisco.common import cisco_exceptions as excep
from quantum.plugins.cisco.common import config as conf
from quantum.plugins.cisco.db import network_db_v2 as cdb
from quantum.plugins.cisco.db import nexus_db_v2 as nxos_db

@ -25,7 +25,6 @@ import routes
import webob
from webtest import TestApp
from quantum import api as server
from quantum.api import extensions
from quantum.api.extensions import (
ExtensionMiddleware,

@ -20,10 +20,7 @@ test_database.py is an independent test suite
that tests the database api method calls
"""
import logging as LOG
from quantum.openstack.common import log as logging
from quantum.plugins.cisco.common import cisco_constants as const
import quantum.plugins.cisco.db.api as db
import quantum.plugins.cisco.db.l2network_db as l2network_db
import quantum.plugins.cisco.db.nexus_db_v2 as nexus_db
@ -364,8 +361,8 @@ class NexusDBTest(base.BaseTestCase):
def testb_getall_nexusportbindings(self):
"""get all nexus port binding"""
binding1 = self.dbtest.create_nexusportbinding("port1", 10)
binding2 = self.dbtest.create_nexusportbinding("port2", 10)
self.dbtest.create_nexusportbinding("port1", 10)
self.dbtest.create_nexusportbinding("port2", 10)
bindings = self.dbtest.get_all_nexusportbindings()
count = 0
for bind in bindings:
@ -376,7 +373,7 @@ class NexusDBTest(base.BaseTestCase):
def testc_delete_nexusportbinding(self):
"""delete nexus port binding"""
binding1 = self.dbtest.create_nexusportbinding("port1", 10)
self.dbtest.create_nexusportbinding("port1", 10)
self.dbtest.delete_nexusportbinding(10)
bindings = self.dbtest.get_all_nexusportbindings()
count = 0

@ -23,8 +23,7 @@ from quantum.common import exceptions as q_exc
from quantum.common import topics
from quantum.db import db_base_plugin_v2
from quantum.db import l3_db
# NOTE: quota_db cannot be removed, it is for db model
from quantum.db import quota_db
from quantum.db import quota_db # noqa
from quantum.extensions import portbindings
from quantum.extensions import providernet as provider
from quantum.openstack.common import log as logging

@ -42,8 +42,7 @@ from quantum import context
from quantum.openstack.common import log as logging
from quantum.openstack.common import loopingcall
from quantum.openstack.common.rpc import dispatcher
# NOTE (e0ne): this import is needed for config init
from quantum.plugins.linuxbridge.common import config
from quantum.plugins.linuxbridge.common import config # noqa
from quantum.plugins.linuxbridge.common import constants as lconst

@ -22,8 +22,7 @@ from quantum.db import models_v2
from quantum.db import securitygroups_db as sg_db
from quantum import manager
from quantum.openstack.common import log as logging
# NOTE (e0ne): this import is needed for config init
from quantum.plugins.linuxbridge.common import config
from quantum.plugins.linuxbridge.common import config # noqa
from quantum.plugins.linuxbridge.common import constants
from quantum.plugins.linuxbridge.db import l2network_models_v2

@ -25,7 +25,6 @@ from quantum.common import constants as q_const
from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc
from quantum.common import topics
from quantum.common import utils
from quantum.db import agents_db
from quantum.db import agentschedulers_db
from quantum.db import api as db_api
@ -33,12 +32,10 @@ from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base
from quantum.db import extraroute_db
from quantum.db import l3_rpc_base
# NOTE: quota_db cannot be removed, it is for db model
from quantum.db import quota_db
from quantum.db import quota_db # noqa
from quantum.db import securitygroups_rpc_base as sg_db_rpc
from quantum.extensions import portbindings
from quantum.extensions import providernet as provider
from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc

@ -26,8 +26,7 @@ from quantum.db import models_v2
from quantum.extensions.flavor import (FLAVOR_NETWORK, FLAVOR_ROUTER)
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
# NOTE (e0ne): this import is needed for config init
from quantum.plugins.metaplugin.common import config
from quantum.plugins.metaplugin.common import config # noqa
from quantum.plugins.metaplugin import meta_db_v2
from quantum.plugins.metaplugin.meta_models_v2 import (NetworkFlavor,
RouterFlavor)

@ -218,13 +218,11 @@ class RuleManager:
protocol = rule['protocol']
port_range_max = rule['port_range_max']
rule_id = rule['id']
ethertype = rule['ethertype']
security_group_id = rule['security_group_id']
remote_group_id = rule['remote_group_id']
remote_ip_prefix = rule['remote_ip_prefix'] # watch out. not validated
tenant_id = rule['tenant_id']
port_range_min = rule['port_range_min']
external_id = rule['external_id']
# construct a corresponding rule
tp_src_start = tp_src_end = None
@ -246,11 +244,11 @@ class RuleManager:
tp_dst_start, tp_dst_end = port_range_min, port_range_max
# protocol
if rule['protocol'] == 'tcp':
if protocol == 'tcp':
nw_proto = 6
elif rule['protocol'] == 'udp':
elif protocol == 'udp':
nw_proto = 17
elif rule['protocol'] == 'icmp':
elif protocol == 'icmp':
nw_proto = 1
# extract type and code from reporposed fields
icmp_type = rule['from_port']

@ -25,7 +25,6 @@ from oslo.config import cfg
from webob import exc as w_exc
from quantum.common import exceptions as q_exc
from quantum.common.utils import find_config_file
from quantum.db import api as db
from quantum.db import db_base_plugin_v2
from quantum.db import l3_db
@ -33,7 +32,7 @@ from quantum.db import models_v2
from quantum.db import securitygroups_db
from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import log as logging
from quantum.plugins.midonet import config
from quantum.plugins.midonet import config # noqa
from quantum.plugins.midonet import midonet_lib

@ -63,7 +63,7 @@ class NECPluginApi(agent_rpc.PluginApi):
datapath_id=datapath_id,
port_added=port_added,
port_removed=port_removed))
except Exception as e:
except Exception:
LOG.warn(_("update_ports() failed."))
return

@ -18,8 +18,7 @@
from oslo.config import cfg
from quantum.agent.common import config
# import rpc config options
from quantum.openstack.common import rpc
from quantum.openstack.common import rpc # noqa
from quantum import scheduler

@ -24,8 +24,7 @@ from quantum.db import securitygroups_db as sg_db
from quantum.extensions import securitygroup as ext_sg
from quantum import manager
from quantum.openstack.common import log as logging
# NOTE (e0ne): this import is needed for config init
from quantum.plugins.nec.common import config
from quantum.plugins.nec.common import config # noqa
from quantum.plugins.nec.common import exceptions as nexc
from quantum.plugins.nec.db import models as nmodels
@ -221,7 +220,7 @@ def get_port_from_device(port_id):
plugin = manager.QuantumManager.get_plugin()
port_dict = plugin._make_port_dict(port)
port_dict[ext_sg.SECURITYGROUPS] = [
sg_id for port, sg_id in port_and_sgs if sg_id]
sg_id for port_, sg_id in port_and_sgs if sg_id]
port_dict['security_group_rules'] = []
port_dict['security_group_source_groups'] = []
port_dict['fixed_ips'] = [ip['ip_address']

@ -76,7 +76,7 @@ class PFCDriverBase(ofc_driver_base.OFCDriverBase):
def create_tenant(self, description, tenant_id=None):
ofc_tenant_id = self._generate_pfc_id(tenant_id)
body = {'id': ofc_tenant_id}
res = self.client.post('/tenants', body=body)
self.client.post('/tenants', body=body)
return '/tenants/' + ofc_tenant_id
def delete_tenant(self, ofc_tenant_id):

@ -19,21 +19,16 @@
from quantum.agent import securitygroups_rpc as sg_rpc
from quantum.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from quantum.api.rpc.agentnotifiers import l3_rpc_agent_api
from quantum.common import constants as q_const
from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc
from quantum.common import topics
from quantum import context
from quantum.db import agents_db
from quantum.db import agentschedulers_db
from quantum.db import dhcp_rpc_base
from quantum.db import extraroute_db
from quantum.db import l3_rpc_base
#NOTE(amotoki): quota_db cannot be removed, it is for db model
from quantum.db import quota_db
from quantum.db import quota_db # noqa
from quantum.db import securitygroups_rpc_base as sg_db_rpc
from quantum.extensions import portbindings
from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
@ -670,7 +665,6 @@ class NECPluginV2RPCCallbacks(object):
"""
LOG.debug(_("NECPluginV2RPCCallbacks.update_ports() called, "
"kwargs=%s ."), kwargs)
topic = kwargs['topic']
datapath_id = kwargs['datapath_id']
session = rpc_context.session
for p in kwargs.get('port_added', []):

@ -132,5 +132,5 @@ class OFCManager(object):
ofc_pf_id = self._get_ofc_id(context, "ofc_packet_filter", filter_id)
ofc_pf_id = self.driver.convert_ofc_filter_id(context, ofc_pf_id)
res = self.driver.delete_filter(ofc_pf_id)
self.driver.delete_filter(ofc_pf_id)
self._del_ofc_item(context, "ofc_packet_filter", filter_id)

@ -43,8 +43,7 @@ from quantum.db import dhcp_rpc_base
from quantum.db import l3_db
from quantum.db import models_v2
from quantum.db import portsecurity_db
# NOTE: quota_db cannot be removed, it is for db model
from quantum.db import quota_db
from quantum.db import quota_db # noqa
from quantum.db import securitygroups_db
from quantum.extensions import l3
from quantum.extensions import portsecurity as psec

@ -32,7 +32,6 @@ from quantum.db import models_v2
from quantum.openstack.common import log as logging
from quantum.openstack.common import uuidutils
from quantum.plugins.nicira.nicira_nvp_plugin.extensions import nvp_networkgw
from quantum import policy
LOG = logging.getLogger(__name__)

@ -40,7 +40,7 @@ from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import log as logging
from quantum.openstack.common import loopingcall
from quantum.openstack.common.rpc import dispatcher
from quantum.plugins.openvswitch.common import config
from quantum.plugins.openvswitch.common import config # noqa
from quantum.plugins.openvswitch.common import constants

@ -18,8 +18,6 @@
from sqlalchemy.orm import exc
from oslo.config import cfg
from quantum.common import exceptions as q_exc
import quantum.db.api as db
from quantum.db import models_v2
@ -329,7 +327,7 @@ def get_port_from_device(port_id):
plugin = manager.QuantumManager.get_plugin()
port_dict = plugin._make_port_dict(port)
port_dict[ext_sg.SECURITYGROUPS] = [
sg_id for port, sg_id in port_and_sgs if sg_id]
sg_id for port_, sg_id in port_and_sgs if sg_id]
port_dict['security_group_rules'] = []
port_dict['security_group_source_groups'] = []
port_dict['fixed_ips'] = [ip['ip_address']

@ -38,17 +38,15 @@ from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base
from quantum.db import extraroute_db
from quantum.db import l3_rpc_base
# NOTE: quota_db cannot be removed, it is for db model
from quantum.db import quota_db
from quantum.db import quota_db # noqa
from quantum.db import securitygroups_rpc_base as sg_db_rpc
from quantum.extensions import portbindings
from quantum.extensions import providernet as provider
from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
from quantum.plugins.openvswitch.common import config
from quantum.plugins.openvswitch.common import config # noqa
from quantum.plugins.openvswitch.common import constants
from quantum.plugins.openvswitch import ovs_db_v2
from quantum import policy

@ -21,8 +21,6 @@ This plugin will forward authenticated REST API calls
to the Network Operating System by PLUMgrid called NOS
"""
import sys
from oslo.config import cfg
from quantum.db import api as db
@ -68,8 +66,8 @@ class QuantumPluginPLUMgridV2(db_base_plugin_v2.QuantumDbPluginV2):
self.snippets = plumgrid_nos_snippets.DataNOSPLUMgrid()
# TODO: (Edgar) These are placeholders for next PLUMgrid release
nos_username = cfg.CONF.PLUMgridNOS.username
nos_password = cfg.CONF.PLUMgridNOS.password
cfg.CONF.PLUMgridNOS.username
cfg.CONF.PLUMgridNOS.password
self.rest_conn = rest_connection.RestConnection(nos_plumgrid,
nos_port, timeout)
if self.rest_conn is None:
@ -131,10 +129,6 @@ class QuantumPluginPLUMgridV2(db_base_plugin_v2.QuantumDbPluginV2):
self._network_admin_state(network)
tenant_id = self._get_tenant_id_for_create(context, network["network"])
# Get initial network details
original_net = super(QuantumPluginPLUMgridV2, self).get_network(
context, net_id)
with context.session.begin(subtransactions=True):
# Plugin DB - Network Update
new_network = super(
@ -169,8 +163,8 @@ class QuantumPluginPLUMgridV2(db_base_plugin_v2.QuantumDbPluginV2):
with context.session.begin(subtransactions=True):
# Plugin DB - Network Delete
net_deleted = super(QuantumPluginPLUMgridV2,
self).delete_network(context, net_id)
super(QuantumPluginPLUMgridV2, self).delete_network(context,
net_id)
try:
nos_url = self.snippets.BASE_NOS_URL + net_id

@ -82,7 +82,7 @@ class RestConnection(object):
raise plum_excep.PLUMgridException(err_message)
ret = (resp.status, resp.reason, resp_str)
except urllib2.HTTPError, e:
except urllib2.HTTPError:
LOG.error(_('PLUMgrid_NOS_Server: %(action)s failure, %(e)r'))
ret = 0, None, None, None
conn.close()

@ -43,7 +43,7 @@ from quantum import context as q_context
from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import log
from quantum.openstack.common.rpc import dispatcher
from quantum.plugins.ryu.common import config
from quantum.plugins.ryu.common import config # noqa
LOG = log.getLogger(__name__)

@ -53,7 +53,7 @@ def get_port_from_device(port_id):
plugin = manager.QuantumManager.get_plugin()
port_dict = plugin._make_port_dict(port)
port_dict[ext_sg.SECURITYGROUPS] = [
sg_id for port, sg_id in port_and_sgs if sg_id]
sg_id for port_, sg_id in port_and_sgs if sg_id]
port_dict['security_group_rules'] = []
port_dict['security_group_source_groups'] = []
port_dict['fixed_ips'] = [ip['ip_address'] for ip in port['fixed_ips']]

@ -32,11 +32,10 @@ from quantum.db import extraroute_db
from quantum.db import l3_rpc_base
from quantum.db import models_v2
from quantum.db import securitygroups_rpc_base as sg_db_rpc
from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
from quantum.plugins.ryu.common import config
from quantum.plugins.ryu.common import config # noqa
from quantum.plugins.ryu.db import api_v2 as db_api_v2

@ -77,7 +77,6 @@ class HaproxyNSDriver(object):
namespace = get_ns_name(pool_id)
ns = ip_lib.IPWrapper(self.root_helper, namespace)
pid_path = self._get_state_file_path(pool_id, 'pid')
sock_path = self._get_state_file_path(pool_id, 'sock')
# kill the process
kill_pids_in_file(self.root_helper, pid_path)

@ -86,7 +86,7 @@ class ChanceScheduler(object):
# check if the configuration of l3 agent is compatible
# with the router
router_ids = [router_id[0] for router_id in router_ids]
router_ids = [router_id_[0] for router_id_ in router_ids]
routers = plugin.get_routers(context, filters={'id': router_ids})
to_removed_ids = []
for router in routers:

@ -76,7 +76,7 @@ to the aid of their party.\"\n")
self.assertEqual(result,
"0 arg Now is the time for all good men to \
come to the aid of their party.")
except Exception, ex:
except Exception:
LOG.exception("Losing in rootwrap test")
def tearDown(self):

@ -68,7 +68,7 @@ class BigSwitchProxyPluginV2TestCase(test_plugin.QuantumDbPluginV2TestCase):
self.httpPatch = patch('httplib.HTTPConnection', create=True,
new=HTTPConnectionMock)
self.addCleanup(self.httpPatch.stop)
MockHTTPConnection = self.httpPatch.start()
self.httpPatch.start()
super(BigSwitchProxyPluginV2TestCase,
self).setUp(self._plugin_name)

@ -156,7 +156,7 @@ class RouterDBTestCase(test_l3_plugin.L3NatDBTestCase):
with self.subnet(cidr='10.0.10.0/24') as public_sub:
self._set_net_external(public_sub['subnet']['network_id'])
with self.port() as private_port:
with self.router() as r:
with self.router():
res = self._create_floatingip(
'json',
public_sub['subnet']['network_id'],
@ -226,7 +226,7 @@ class RouterDBTestCase(test_l3_plugin.L3NatDBTestCase):
def test_router_remove_interface_wrong_port_returns_404(self):
with self.router() as r:
with self.subnet(cidr='10.0.10.0/24') as s:
with self.subnet(cidr='10.0.10.0/24'):
with self.port(no_delete=True) as p:
self._router_interface_action('add',
r['router']['id'],

@ -17,11 +17,9 @@ import logging
import mock
from quantum import context
from quantum.db import api as db
from quantum.manager import QuantumManager
from quantum.plugins.cisco.common import cisco_constants as const
from quantum.plugins.cisco.db import network_db_v2
from quantum.plugins.cisco.db import network_models_v2
from quantum.plugins.cisco.db import network_db_v2 # noqa
from quantum.tests.unit import test_db_plugin
LOG = logging.getLogger(__name__)

@ -18,7 +18,6 @@ import mock
from quantum.db import api as db
from quantum.openstack.common import importutils
from quantum.plugins.cisco.common import cisco_constants as const
from quantum.plugins.cisco.db import network_models_v2
from quantum.plugins.cisco.nexus import cisco_nexus_plugin_v2
from quantum.tests import base

@ -15,35 +15,23 @@
import contextlib
import logging
import mock
import os
import testtools
from oslo.config import cfg
import webob.exc
from quantum import context
from quantum.api.extensions import ExtensionMiddleware
from quantum.api.extensions import PluginAwareExtensionManager
from quantum.api.v2 import attributes
from quantum.api.v2.router import APIRouter
from quantum.common import config
from quantum.common import exceptions as q_exc
from quantum.common.test_lib import test_config
from quantum.db import api as db
from quantum.db.loadbalancer import loadbalancer_db as ldb
import quantum.extensions
from quantum.extensions import loadbalancer
from quantum.manager import QuantumManager
from quantum.plugins.common import constants
from quantum.plugins.services.agent_loadbalancer import (
plugin as loadbalancer_plugin
)
from quantum.tests.unit import test_db_plugin
from quantum.tests.unit import test_extensions
from quantum.tests.unit import testlib_api
from quantum.tests.unit.testlib_api import create_request
from quantum import wsgi
LOG = logging.getLogger(__name__)
@ -306,7 +294,7 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
""" Test loadbalancer db plugin via extension and directly """
with self.subnet() as subnet:
with self.pool(name="pool1") as pool:
with self.vip(name='vip1', subnet=subnet, pool=pool) as vip:
with self.vip(name='vip1', subnet=subnet, pool=pool):
vip_data = {
'name': 'vip1',
'pool_id': pool['pool']['id'],
@ -468,7 +456,7 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
self.assertEqual(res['vip'][k], v)
def test_delete_vip(self):
with self.pool() as pool:
with self.pool():
with self.vip(no_delete=True) as vip:
req = self.new_delete_request('vips',
vip['vip']['id'])
@ -600,7 +588,7 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
def test_delete_pool(self):
with self.pool(no_delete=True) as pool:
with self.member(no_delete=True,
pool_id=pool['pool']['id']) as member:
pool_id=pool['pool']['id']):
req = self.new_delete_request('pools',
pool['pool']['id'])
res = req.get_response(self.ext_api)

@ -20,8 +20,6 @@
Unit tests for Windows Hyper-V virtual switch quantum driver
"""
import sys
import mock
from oslo.config import cfg

@ -15,8 +15,7 @@
from oslo.config import cfg
#NOTE this import loads tests required options
from quantum.plugins.linuxbridge.common import config
from quantum.plugins.linuxbridge.common import config # noqa
from quantum.tests import base

@ -46,14 +46,14 @@ class TestLinuxBridge(base.BaseTestCase):
def test_ensure_physical_in_bridge_flat(self):
with mock.patch.object(self.linux_bridge,
'ensure_flat_bridge') as flat_bridge_func:
result = self.linux_bridge.ensure_physical_in_bridge(
self.linux_bridge.ensure_physical_in_bridge(
'network_id', 'physnet1', lconst.FLAT_VLAN_ID)
self.assertTrue(flat_bridge_func.called)
def test_ensure_physical_in_bridge_vlan(self):
with mock.patch.object(self.linux_bridge,
'ensure_vlan_bridge') as vlan_bridge_func:
result = self.linux_bridge.ensure_physical_in_bridge(
self.linux_bridge.ensure_physical_in_bridge(
'network_id', 'physnet1', 7)
self.assertTrue(vlan_bridge_func.called)

@ -25,7 +25,6 @@ import testtools
from quantum import context
from quantum.db import api as db
from quantum.db import models_v2
from quantum.extensions.flavor import (FLAVOR_NETWORK, FLAVOR_ROUTER)
from quantum.openstack.common import uuidutils
from quantum.plugins.metaplugin.meta_quantum_plugin import FlavorNotFound
@ -84,7 +83,6 @@ class MetaQuantumPluginV2Test(base.BaseTestCase):
self.mox = mox.Mox()
self.stubs = stubout.StubOutForTesting()
args = ['--config-file', etcdir('quantum.conf.test')]
self.client_cls_p = mock.patch('quantumclient.v2_0.client.Client')
client_cls = self.client_cls_p.start()
self.client_inst = mock.Mock()

@ -24,10 +24,7 @@ import sys
import uuid
import mock
from webob import exc as w_exc
import quantum.common.test_lib as test_lib
import quantum.tests.unit.midonet as midonet
import quantum.tests.unit.test_db_plugin as test_plugin
@ -116,7 +113,7 @@ class TestMidonetNetworksV2(test_plugin.TestNetworksV2,
super(TestMidonetNetworksV2, self).test_update_network()
def test_list_networks(self):
bridge = self._setup_bridge_mock()
self._setup_bridge_mock()
with self.network(name='net1') as net1:
req = self.new_list_request('networks')
res = self.deserialize('json', req.get_response(self.api))
@ -195,12 +192,6 @@ class TestMidonetNetworksV2(test_plugin.TestNetworksV2,
def test_list_networks_with_pagination_reverse_emulated(self):
pass
def test_list_networks_with_parameters(self):
pass
def test_list_networks_with_parameters_invalid_values(self):
pass
def test_list_networks_with_sort_emulated(self):
pass
@ -221,9 +212,6 @@ class TestMidonetSubnetsV2(test_plugin.TestSubnetsV2,
def test_create_two_subnets_same_cidr_returns_400(self):
pass
def test_create_two_subnets_same_cidr_returns_400(self):
pass
def test_create_subnet_bad_V4_cidr(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_create_subnet_bad_V4_cidr()

@ -21,7 +21,7 @@ from quantum.db import api as db_api
from quantum.openstack.common import uuidutils
from quantum.plugins.nec.common import exceptions as nexc
from quantum.plugins.nec.db import api as ndb
from quantum.plugins.nec.db import models as nmodels
from quantum.plugins.nec.db import models as nmodels # noqa
from quantum.tests import base

@ -19,7 +19,7 @@ from quantum import context
from quantum.openstack.common import uuidutils
from quantum.plugins.nec.common import config
from quantum.plugins.nec.db import api as ndb
from quantum.plugins.nec.db import models as nmodels
from quantum.plugins.nec.db import models as nmodels # noqa
from quantum.plugins.nec import ofc_manager
from quantum.tests import base

@ -22,7 +22,7 @@ import mock
from quantum.api.v2 import attributes
from quantum.extensions import securitygroup as ext_sg
from quantum import manager
from quantum.plugins.nec.db import api as ndb
from quantum.plugins.nec.db import api as ndb # noqa
from quantum.tests.unit import test_extension_security_group as test_sg
from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc

@ -275,7 +275,7 @@ class TremaIdConvertTest(base.BaseTestCase):
t_id = 'dummy'
ofc_t_id, ofc_n_id = generate_random_ids(2)
ofc_n_id = '/networks/%s' % ofc_n_id
ret = self.driver.convert_ofc_network_id(self.ctx, ofc_n_id, t_id)
self.driver.convert_ofc_network_id(self.ctx, ofc_n_id, t_id)
def test_convert_filter_id(self):
ofc_f_id = generate_random_ids(1)

@ -15,7 +15,7 @@
from oslo.config import cfg
from quantum.plugins.nicira.nicira_nvp_plugin.common import config
from quantum.plugins.nicira.nicira_nvp_plugin.common import config # noqa
from quantum.tests import base

@ -17,7 +17,6 @@ import contextlib
import mock
from oslo.config import cfg
import testtools
from webob import exc
import webtest

@ -222,8 +222,8 @@ class TestNiciraNetworksV2(test_plugin.TestNetworksV2,
self.assertEqual(net_del_res.status_int, 204)
def test_list_networks_with_shared(self):
with self.network(name='net1') as net1:
with self.network(name='net2', shared=True) as net2:
with self.network(name='net1'):
with self.network(name='net2', shared=True):
req = self.new_list_request('networks')
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual(len(res['networks']), 2)
@ -478,10 +478,10 @@ class TestNiciraL3NatTestCase(test_l3_plugin.L3NatDBTestCase,
self._nvp_metadata_setup()
with self.router() as r:
with self.subnet() as s:
body = self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None)
self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None)
r_ports = self._list('ports')['ports']
self.assertEqual(len(r_ports), 2)
ips = []
@ -491,10 +491,10 @@ class TestNiciraL3NatTestCase(test_l3_plugin.L3NatDBTestCase,
meta_cidr = netaddr.IPNetwork('169.254.0.0/16')
self.assertTrue(any([ip in meta_cidr for ip in ips]))
# Needed to avoid 409
body = self._router_interface_action('remove',
r['router']['id'],
s['subnet']['id'],
None)
self._router_interface_action('remove',
r['router']['id'],
s['subnet']['id'],
None)
self._nvp_metadata_teardown()
def test_metadatata_network_removed_with_router_interface_remove(self):
@ -531,7 +531,7 @@ class TestNiciraL3NatTestCase(test_l3_plugin.L3NatDBTestCase,
subnets = self._list('subnets')['subnets']
with self.subnet() as s:
with self.port(subnet=s, device_id='1234',
device_owner='network:dhcp') as p:
device_owner='network:dhcp'):
subnets = self._list('subnets')['subnets']
self.assertEqual(len(subnets), 1)
self.assertEquals(subnets[0]['host_routes'][0]['nexthop'],

@ -15,8 +15,7 @@
from oslo.config import cfg
#NOTE this import loads tests required options
from quantum.plugins.openvswitch.common import config
from quantum.plugins.openvswitch.common import config # noqa
from quantum.tests import base

@ -58,7 +58,7 @@ class OVS_Lib_Test(base.BaseTestCase):
self.assertEqual(port.switch.br_name, self.BR_NAME)
# test __str__
foo = str(port)
str(port)
self.mox.VerifyAll()
@ -302,9 +302,8 @@ class OVS_Lib_Test(base.BaseTestCase):
self.assertEqual(ovs_lib.get_bridge_for_iface(root_helper, iface), br)
self.mox.VerifyAll()
def test_iface_to_br(self):
def test_iface_to_br_handles_ovs_vsctl_exception(self):
iface = 'tap0'
br = 'br-int'
root_helper = 'sudo'
utils.execute(["ovs-vsctl", self.TO, "iface-to-br", iface],
root_helper=root_helper).AndRaise(Exception)
@ -327,7 +326,6 @@ class OVS_Lib_Test(base.BaseTestCase):
'ca:fe:de:ad:be:ef', 'br')
port2 = ovs_lib.VifPort('tap5678', 2, uuidutils.generate_uuid(),
'ca:ee:de:ad:be:ef', 'br')
ports = [port1, port2]
self.mox.StubOutWithMock(self.br, 'get_vif_ports')
self.br.get_vif_ports().AndReturn([port1, port2])
self.mox.StubOutWithMock(self.br, 'delete_port')

@ -22,7 +22,6 @@ from oslo.config import cfg
from quantum.agent.linux import ip_lib
from quantum.agent.linux import ovs_lib
from quantum.agent.linux import utils
from quantum.agent import rpc
from quantum.openstack.common import log
from quantum.plugins.openvswitch.agent import ovs_quantum_agent
from quantum.plugins.openvswitch.common import constants
@ -129,10 +128,10 @@ class TunnelTest(base.BaseTestCase):
def testConstruct(self):
self.mox.ReplayAll()
b = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1', self.NET_MAPPING,
'sudo', 2, True)
ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1', self.NET_MAPPING,
'sudo', 2, True)
self.mox.VerifyAll()
def testProvisionLocalVlan(self):

@ -17,8 +17,7 @@
from oslo.config import cfg
#NOTE this import loads tests required options
from quantum.plugins.ryu.common import config
from quantum.plugins.ryu.common import config # noqa
from quantum.tests import base

@ -18,14 +18,10 @@
from contextlib import nested
import operator
from oslo.config import cfg
from quantum.db import api as db
# NOTE: this import is needed for correct plugin code work
from quantum.plugins.ryu.common import config
from quantum.plugins.ryu.common import config # noqa
from quantum.plugins.ryu.db import api_v2 as db_api_v2
# NOTE: this import is needed for correct plugin code work
from quantum.plugins.ryu.db import models_v2 as ryu_models_v2
from quantum.plugins.ryu.db import models_v2 as ryu_models_v2 # noqa
from quantum.tests.unit import test_db_plugin as test_plugin

@ -13,10 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
# NOTE: this import is needed for correct plugin code work
from quantum.plugins.ryu.db import models_v2 as ryu_models_v2
from quantum.plugins.ryu.db import models_v2 as ryu_models_v2 # noqa
from quantum.tests.unit.ryu import fake_ryu
from quantum.tests.unit import test_db_plugin as test_plugin

@ -22,7 +22,6 @@ import mock
from quantum.api.v2 import attributes
from quantum.extensions import securitygroup as ext_sg
from quantum import manager
from quantum.plugins.ryu.db import api_v2 as api_db_v2
from quantum.tests.unit.ryu import fake_ryu
from quantum.tests.unit import test_extension_security_group as test_sg
from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc

@ -19,7 +19,6 @@
import contextlib
import mock
from oslo.config import cfg
import testtools
from quantum.plugins.services.agent_loadbalancer import agent
from quantum.tests import base

@ -135,7 +135,7 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
def test_get_logical_device_inactive(self):
with self.pool() as pool:
with self.vip(pool=pool) as vip:
with self.member(pool_id=vip['vip']['pool_id']) as member:
with self.member(pool_id=vip['vip']['pool_id']):
self.assertRaises(
Exception,
self.callbacks.get_logical_device,
@ -187,7 +187,7 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
with self.pool() as pool:
with self.vip(pool=pool) as vip:
with self.member(pool_id=vip['vip']['pool_id']) as member:
with self.member(pool_id=vip['vip']['pool_id']):
ctx = context.get_admin_context()
func(ctx, port_id=vip['vip']['port_id'], **kwargs)

@ -83,8 +83,6 @@ class TestOVSCleanup(base.BaseTestCase):
def test_delete_quantum_ports(self):
ports = ['tap1234', 'tap5678', 'tap09ab']
port_found = [True, False, True]
delete_ports = [p for p, found
in itertools.izip(ports, port_found) if found]
with contextlib.nested(
mock.patch.object(ip_lib, 'device_exists',
side_effect=port_found),

@ -16,7 +16,6 @@
# under the License.
import mock
from oslo.config import cfg
from quantum.agent import rpc
from quantum.openstack.common import context
@ -77,5 +76,5 @@ class AgentRPCMethods(base.BaseTestCase):
call_to_patch = 'quantum.openstack.common.rpc.create_connection'
with mock.patch(call_to_patch) as create_connection:
conn = rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')])
rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')])
create_connection.assert_has_calls(expected)

@ -20,7 +20,6 @@ import urlparse
import mock
from oslo.config import cfg
from testtools import matchers
import webob
from webob import exc
import webtest
@ -31,7 +30,6 @@ from quantum.api.v2 import attributes
from quantum.api.v2 import base as v2_base
from quantum.api.v2 import router
from quantum.common import config
from quantum.common import constants
from quantum.common import exceptions as q_exc
from quantum import context
from quantum.manager import QuantumManager
@ -39,7 +37,6 @@ from quantum.openstack.common.notifier import api as notifer_api
from quantum.openstack.common import uuidutils
from quantum.tests import base
from quantum.tests.unit import testlib_api
from quantum import wsgi
ROOTDIR = os.path.dirname(os.path.dirname(__file__))

@ -17,7 +17,7 @@ import os
from oslo.config import cfg
from quantum.common import config
from quantum.common import config # noqa
from quantum.tests import base

@ -23,7 +23,6 @@ import random
import mock
from oslo.config import cfg
import sqlalchemy as sa
import testtools
from testtools import matchers
import webob.exc
@ -355,7 +354,6 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
def _create_port(self, fmt, net_id, expected_res_status=None,
arg_list=None, **kwargs):
content_type = 'application/' + fmt
data = {'port': {'network_id': net_id,
'tenant_id': self._tenant_id}}
@ -1058,12 +1056,10 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
self.assertEqual(port['port']['id'], sport['port']['id'])
def test_delete_port(self):
port_id = None
with self.port() as port:
port_id = port['port']['id']
req = self.new_show_request('port', self.fmt, port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, 404)
req = self.new_show_request('port', self.fmt, port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, 404)
def test_delete_port_public_network(self):
with self.network(shared=True) as network:
@ -1074,7 +1070,6 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
set_context=True)
port = self.deserialize(self.fmt, port_res)
port_id = port['port']['id']
# delete the port
self._delete('ports', port['port']['id'])
# Todo: verify!!!
@ -1241,23 +1236,18 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
kwargs = {"mac_address": mac}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
def test_mac_generation(self):
cfg.CONF.set_override('base_mac', "12:34:56:00:00:00")
with self.port() as port:
mac = port['port']['mac_address']
# check that MAC address matches base MAC
base_mac = cfg.CONF.base_mac
self.assertTrue(mac.startswith("12:34:56"))
def test_mac_generation_4octet(self):
cfg.CONF.set_override('base_mac', "12:34:56:78:00:00")
with self.port() as port:
mac = port['port']['mac_address']
# check that MAC address matches base MAC
base_mac = cfg.CONF.base_mac
self.assertTrue(mac.startswith("12:34:56:78"))
def test_bad_mac_format(self):
@ -1296,7 +1286,6 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
'ip_address': ips[0]['ip_address']}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
def test_requested_subnet_delete(self):
@ -1492,7 +1481,6 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
'ip_address': '1011.0.0.5'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_requested_split(self):
@ -1539,7 +1527,6 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
'ip_address': '10.0.0.5'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_fixed_ip_invalid_subnet_id(self):
@ -1549,7 +1536,6 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
'ip_address': '10.0.0.5'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_fixed_ip_invalid_ip(self):
@ -1559,7 +1545,6 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
'ip_address': '10.0.0.55555'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_requested_ips_only(self):
@ -1676,7 +1661,6 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
matchers.GreaterThan(datetime.timedelta(seconds=10)))
def test_port_delete_holds_ip(self):
plugin = QuantumManager.get_plugin()
base_class = db_base_plugin_v2.QuantumDbPluginV2
with mock.patch.object(base_class, '_hold_ip') as hold_ip:
with self.subnet() as subnet:
@ -1822,8 +1806,6 @@ class TestNetworksV2(QuantumDbPluginV2TestCase):
def test_create_public_network_no_admin_tenant(self):
name = 'public_net'
keys = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', True)]
with testtools.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
with self.network(name=name,
@ -2489,7 +2471,6 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
network_id = network['network']['id']
subnet = self._make_subnet(self.fmt, network, gateway_ip,
cidr, ip_version=4)
self._create_port(self.fmt,
@ -2501,7 +2482,7 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
def test_delete_subnet_port_exists_owned_by_other(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
with self.port(subnet=subnet):
id = subnet['subnet']['id']
req = self.new_delete_request('subnets', id)
res = req.get_response(self.api)
@ -2517,8 +2498,7 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
subnet = self._make_subnet(self.fmt, network, gateway_ip,
cidr, ip_version=4)
self._make_subnet(self.fmt, network, gateway_ip, cidr, ip_version=4)
req = self.new_delete_request('networks', network['network']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, 204)
@ -2669,8 +2649,7 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
expected = {'gateway_ip': gateway,
'cidr': cidr,
'allocation_pools': allocation_pools}
subnet = self._test_create_subnet(expected=expected,
gateway_ip=gateway)
self._test_create_subnet(expected=expected, gateway_ip=gateway)
# Gateway is last IP in range
gateway = '10.0.0.254'
allocation_pools = [{'start': '10.0.0.1',
@ -2678,8 +2657,7 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
expected = {'gateway_ip': gateway,
'cidr': cidr,
'allocation_pools': allocation_pools}
subnet = self._test_create_subnet(expected=expected,
gateway_ip=gateway)
self._test_create_subnet(expected=expected, gateway_ip=gateway)
# Gateway is first in subnet
gateway = '10.0.0.1'
allocation_pools = [{'start': '10.0.0.2',
@ -2687,8 +2665,8 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
expected = {'gateway_ip': gateway,
'cidr': cidr,
'allocation_pools': allocation_pools}
subnet = self._test_create_subnet(expected=expected,
gateway_ip=gateway)
self._test_create_subnet(expected=expected,
gateway_ip=gateway)
def test_create_force_subnet_gw_values(self):
cfg.CONF.set_override('force_gateway_on_subnet', True)
@ -2741,7 +2719,6 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, 201)
port = self.deserialize(self.fmt, res)
port_id = port['port']['id']
# delete the port
self._delete('ports', port['port']['id'])
@ -2751,7 +2728,6 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, 201)
port = self.deserialize(self.fmt, res)
port_id = port['port']['id']
# delete the port
self._delete('ports', port['port']['id'])
cfg.CONF.set_override('dhcp_lease_duration', 120)

@ -97,8 +97,8 @@ class TestDhcpRpcCallackMixin(base.BaseTestCase):
expectations = [
mock.call.update_port(mock.ANY, 'port_id', dict(port=port_retval))]
retval = self._test_get_dhcp_port_helper(port_retval, expectations,
update_port=port_retval)
self._test_get_dhcp_port_helper(port_retval, expectations,
update_port=port_retval)
self.assertEqual(len(self.log.mock_calls), 1)
def test_get_dhcp_port_create_new(self):

@ -333,13 +333,6 @@ class TestDebugCommands(base.BaseTestCase):
with mock.patch('quantum.agent.linux.ip_lib.IpNetnsCommand') as ns:
cmd.run(parsed_args)
ns.assert_has_calls([mock.call.execute(mock.ANY)])
fake_port = {'port':
{'device_owner': DEVICE_OWNER_NETWORK_PROBE,
'admin_state_up': True,
'network_id': 'fake_net',
'tenant_id': 'fake_tenant',
'fixed_ips': [{'subnet_id': 'fake_subnet'}],
'device_id': socket.gethostname()}}
expected = [mock.call.list_ports(),
mock.call.list_ports(
network_id='fake_net',

@ -113,9 +113,7 @@ fake_down_network = FakeModel('12345678-dddd-dddd-1234567890ab',
class TestDhcpAgent(base.BaseTestCase):
def setUp(self):
super(TestDhcpAgent, self).setUp()
cfg.CONF.register_opts(dhcp_agent.DeviceManager.OPTS)
cfg.CONF.register_opts(dhcp_agent.DhcpAgent.OPTS)
cfg.CONF.register_opts(dhcp_agent.DhcpLeaseRelay.OPTS)
dhcp_agent.register_options()
cfg.CONF.set_override('interface_driver',
'quantum.agent.linux.interface.NullDriver')
self.driver_cls_p = mock.patch(
@ -182,7 +180,7 @@ class TestDhcpAgent(base.BaseTestCase):
mock.call().wait()])
def test_run_completes_single_pass(self):
with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
with mock.patch('quantum.agent.dhcp_agent.DeviceManager'):
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
attrs_to_mock = dict(
[(a, mock.DEFAULT) for a in
@ -195,13 +193,13 @@ class TestDhcpAgent(base.BaseTestCase):
[mock.call.start()])
def test_ns_name(self):
with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
with mock.patch('quantum.agent.dhcp_agent.DeviceManager'):
mock_net = mock.Mock(id='foo')
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
self.assertEqual(dhcp._ns_name(mock_net), 'qdhcp-foo')
def test_ns_name_disabled_namespace(self):
with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
with mock.patch('quantum.agent.dhcp_agent.DeviceManager'):
cfg.CONF.set_override('use_namespaces', False)
mock_net = mock.Mock(id='foo')
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
@ -986,10 +984,6 @@ class TestDeviceManager(base.BaseTestCase):
fake_port = FakeModel('12345678-1234-aaaa-1234567890ab',
mac_address='aa:bb:cc:dd:ee:ff')
expected_driver_calls = [mock.call(cfg.CONF),
mock.call().get_device_name(fake_network),
mock.call().unplug('tap12345678-12')]
with mock.patch('quantum.agent.linux.interface.NullDriver') as dvr_cls:
mock_driver = mock.MagicMock()
#mock_driver.DEV_NAME_LEN = (
@ -1018,10 +1012,6 @@ class TestDeviceManager(base.BaseTestCase):
fake_port = FakeModel('12345678-1234-aaaa-1234567890ab',
mac_address='aa:bb:cc:dd:ee:ff')
expected_driver_calls = [mock.call(cfg.CONF),
mock.call().get_device_name(fake_network),
mock.call().unplug('tap12345678-12')]
with mock.patch('quantum.agent.linux.interface.NullDriver') as dvr_cls:
mock_driver = mock.MagicMock()
mock_driver.get_device_name.return_value = 'tap12345678-12'
@ -1046,10 +1036,6 @@ class TestDeviceManager(base.BaseTestCase):
fake_port = FakeModel('12345678-1234-aaaa-1234567890ab',
mac_address='aa:bb:cc:dd:ee:ff')
expected_driver_calls = [mock.call(cfg.CONF),
mock.call().get_device_name(fake_network),
mock.call().unplug('tap12345678-12')]
with mock.patch('quantum.agent.linux.interface.NullDriver') as dvr_cls:
mock_driver = mock.MagicMock()
mock_driver.get_device_name.return_value = 'tap12345678-12'
@ -1100,7 +1086,7 @@ class TestDhcpLeaseRelay(base.BaseTestCase):
exists.return_value = False
self.unlink.side_effect = OSError
relay = dhcp_agent.DhcpLeaseRelay(None)
dhcp_agent.DhcpLeaseRelay(None)
self.unlink.assert_called_once_with(
cfg.CONF.dhcp_lease_relay_socket)
@ -1110,7 +1096,7 @@ class TestDhcpLeaseRelay(base.BaseTestCase):
with mock.patch('os.path.exists') as exists:
exists.return_value = False
relay = dhcp_agent.DhcpLeaseRelay(None)
dhcp_agent.DhcpLeaseRelay(None)
self.unlink.assert_called_once_with(
cfg.CONF.dhcp_lease_relay_socket)
@ -1121,7 +1107,7 @@ class TestDhcpLeaseRelay(base.BaseTestCase):
with mock.patch('os.path.exists') as exists:
exists.return_value = True
with testtools.ExpectedException(OSError):
relay = dhcp_agent.DhcpLeaseRelay(None)
dhcp_agent.DhcpLeaseRelay(None)
self.unlink.assert_called_once_with(
cfg.CONF.dhcp_lease_relay_socket)
@ -1175,14 +1161,6 @@ class TestDhcpLeaseRelay(base.BaseTestCase):
self.assertTrue(log.called)
def test_handler_other_exception(self):
network_id = 'cccccccc-cccc-cccc-cccc-cccccccccccc'
ip_address = '192.168.x.x'
lease_remaining = 120
json_rep = jsonutils.dumps(
dict(network_id=network_id,
lease_remaining=lease_remaining,
ip_address=ip_address))
handler = mock.Mock()
mock_sock = mock.Mock()
mock_sock.recv.side_effect = Exception

@ -17,7 +17,6 @@ import contextlib
import os
import mock
from oslo.config import cfg
import webob.exc
from quantum.api.v2 import attributes as attr

@ -111,8 +111,7 @@ class ResourceExtensionTest(base.BaseTestCase):
# anything that is below 200 or above 400 so we can't actually check
# it. It throws webtest.AppError instead.
try:
response = (
test_app.get("/tweedles/some_id/notimplemented_function"))
test_app.get("/tweedles/some_id/notimplemented_function")
# Shouldn't be reached
self.assertTrue(False)
except webtest.AppError:

@ -45,7 +45,6 @@ from quantum.openstack.common import log as logging
from quantum.openstack.common.notifier import api as notifier_api
from quantum.openstack.common.notifier import test_notifier
from quantum.openstack.common import uuidutils
from quantum.tests import base
from quantum.tests.unit import test_api_v2
from quantum.tests.unit import test_db_plugin
from quantum.tests.unit import test_extensions

@ -17,6 +17,7 @@
# @author: Mark McClain, DreamHost
import os
import sys
import mock
import testtools
@ -44,16 +45,16 @@ class TestPidfile(base.BaseTestCase):
self.os.O_CREAT = os.O_CREAT
self.os.O_RDWR = os.O_RDWR
p = daemon.Pidfile('thefile', 'python')
daemon.Pidfile('thefile', 'python')
self.os.open.assert_called_once_with('thefile', os.O_CREAT | os.O_RDWR)
self.fcntl.flock.assert_called_once_with(FAKE_FD, self.fcntl.LOCK_EX)
def test_init_open_fail(self):
self.os.open.side_effect = IOError
with mock.patch.object(daemon.sys, 'stderr') as stderr:
with mock.patch.object(daemon.sys, 'stderr'):
with testtools.ExpectedException(SystemExit):
p = daemon.Pidfile('thefile', 'python')
daemon.Pidfile('thefile', 'python')
sys.assert_has_calls([
mock.call.stderr.write(mock.ANY),
mock.call.exit(1)]
@ -126,7 +127,7 @@ class TestDaemon(base.BaseTestCase):
def test_fork_error(self):
self.os.fork.side_effect = lambda: OSError(1)
with mock.patch.object(daemon.sys, 'stderr') as stderr:
with mock.patch.object(daemon.sys, 'stderr'):
with testtools.ExpectedException(SystemExit):
d = daemon.Daemon('pidfile', 'stdin')
d._fork()
@ -174,7 +175,7 @@ class TestDaemon(base.BaseTestCase):
self.pidfile.return_value.is_running.return_value = True
d = daemon.Daemon('pidfile')
with mock.patch.object(daemon.sys, 'stderr') as stderr:
with mock.patch.object(daemon.sys, 'stderr'):
with mock.patch.object(d, 'daemonize') as daemonize:
with testtools.ExpectedException(SystemExit):
d.start()

@ -73,7 +73,7 @@ class TestProcessManager(base.BaseTestCase):
active.__get__ = mock.Mock(return_value=True)
manager = ep.ProcessManager(self.conf, 'uuid', namespace='ns')
with mock.patch.object(ep, 'ip_lib') as ip_lib:
with mock.patch.object(ep, 'ip_lib'):
manager.enable(callback)
self.assertFalse(callback.called)

@ -16,7 +16,6 @@
# under the License.
import mock
from oslo.config import cfg
from quantum.agent.common import config
from quantum.agent.dhcp_agent import DeviceManager
@ -60,9 +59,6 @@ class FakePort:
class TestBase(base.BaseTestCase):
def setUp(self):
super(TestBase, self).setUp()
root_helper_opt = [
cfg.StrOpt('root_helper', default='sudo'),
]
self.conf = config.setup_conf()
self.conf.register_opts(interface.OPTS)
config.register_root_helper(self.conf)
@ -316,15 +312,6 @@ class TestBridgeInterfaceDriver(TestBase):
self.conf.set_override('network_device_mtu', 9000)
self._test_plug(mtu=9000)
def test_unplug(self):
self.device_exists.return_value = True
with mock.patch('quantum.agent.linux.interface.LOG.debug') as log:
br = interface.BridgeInterfaceDriver(self.conf)
br.unplug('tap0')
log.assert_called_once()
self.execute.assert_has_calls(
[mock.call(['ip', 'link', 'delete', 'tap0'], 'sudo')])
def test_unplug_no_device(self):
self.device_exists.return_value = False
self.ip_dev().link.delete.side_effect = RuntimeError

Some files were not shown because too many files have changed in this diff Show More