Switch to neutron_lib pep8 factory

Change-Id: Ib42f6c2054bc9bdb82afa345833c1c95cd947fa7
This commit is contained in:
Anna Khmelnitsky 2017-03-23 10:35:13 -07:00
parent 8c2a736870
commit 58d3a43779
20 changed files with 107 additions and 90 deletions
gbpservice
contrib
nfp/configurator/drivers/loadbalancer/v2/haproxy
tests/unit/nfp/configurator/drivers/vpn
neutron
services
grouppolicy/drivers/cisco/apic
servicechain/plugins/ncp/node_drivers
tests/unit
nfp
tox.ini

@ -29,7 +29,6 @@ from octavia.i18n import _LE
from octavia.i18n import _LW
from oslo_config import cfg
import requests
import six
from gbpservice.contrib.nfp.configurator.drivers.loadbalancer.v2.haproxy.\
local_cert_manager import LocalCertManager
@ -85,7 +84,7 @@ class AmphoraAPIClient(rest_api_driver.AmphoraAPIClient):
headers['User-Agent'] = OCTAVIA_API_CLIENT
# Keep retrying
for a in six.moves.xrange(CONF.haproxy_amphora.connection_max_retries):
for a in range(CONF.haproxy_amphora.connection_max_retries):
try:
with warnings.catch_warnings():
warnings.filterwarnings(

@ -144,7 +144,7 @@ class VpnaasIpsecDriverTestCase(base.BaseTestCase):
mock_get.return_value = self.resp
mock_json.return_value = {'state': 'DOWN'}
state = self.driver.check_status(self.context, svc_context)
self.assertEqual(state, None)
self.assertIsNone(state)
class VpnGenericConfigDriverTestCase(base.BaseTestCase):

@ -1845,7 +1845,7 @@ class ApicMappingDriver(api.ResourceMappingDriver,
methods = [self.apic_manager.set_contract_for_epg,
self.apic_manager.unset_contract_for_epg]
for x in xrange(len(provided)):
for x in range(len(provided)):
for c in self.gbp_plugin.get_policy_rule_sets(
plugin_context, filters={'id': provided[x]}):
c_owner = self._tenant_by_sharing_policy(c)
@ -1853,7 +1853,7 @@ class ApicMappingDriver(api.ResourceMappingDriver,
for params in ptg_params:
methods[x](params[0], params[1], c, provider=True,
contract_owner=c_owner, transaction=None)
for x in xrange(len(consumed)):
for x in range(len(consumed)):
for c in self.gbp_plugin.get_policy_rule_sets(
plugin_context, filters={'id': consumed[x]}):
c_owner = self._tenant_by_sharing_policy(c)
@ -1903,14 +1903,14 @@ class ApicMappingDriver(api.ResourceMappingDriver,
methods = [self.apic_manager.set_contract_for_external_epg,
self.apic_manager.unset_contract_for_external_epg]
with self.apic_manager.apic.transaction(transaction) as trs:
for x in xrange(len(provided)):
for x in range(len(provided)):
for c in self._get_policy_rule_sets(plugin_context,
{'id': provided[x]}):
c = self.name_mapper.policy_rule_set(plugin_context, c)
methods[x](mapped_es, c, external_epg=mapped_ep,
owner=mapped_tenant, provided=True,
transaction=trs)
for x in xrange(len(consumed)):
for x in range(len(consumed)):
for c in self._get_policy_rule_sets(plugin_context,
{'id': consumed[x]}):
c = self.name_mapper.policy_rule_set(plugin_context, c)
@ -1932,7 +1932,7 @@ class ApicMappingDriver(api.ResourceMappingDriver,
methods = [self.apic_manager.ensure_subnet_created_on_apic,
self.apic_manager.ensure_subnet_deleted_on_apic]
with self.apic_manager.apic.transaction(transaction) as trs:
for x in xrange(len(subnets)):
for x in range(len(subnets)):
for s in subnets[x]:
methods[x](mapped_tenant, mapped_l2p, self._gateway_ip(s),
transaction=trs)

@ -31,6 +31,7 @@ from oslo_utils import excutils
import sqlalchemy as sa
from sqlalchemy.orm.exc import NoResultFound
from gbpservice._i18n import _
from gbpservice._i18n import _LE
from gbpservice._i18n import _LI
from gbpservice.common import utils
@ -902,7 +903,8 @@ class NFPNodeDriver(driver_base.NodeDriverBase):
LOG.error(_LE("Service Targets are not created for the Node "
"of service_type %(service_type)s"),
{'service_type': service_type})
raise Exception("Service Targets are not created for the Node")
raise Exception(_("Service Targets are not created "
"for the Node"))
if (not consumer_service_targets and
not provider_service_targets):

@ -528,8 +528,8 @@ class TestGroupResources(GroupPolicyDbTestCase):
'policy_target_groups', data, ptg1['policy_target_group']['id'])
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(
res['policy_target_group']['network_service_policy_id'], None)
self.assertIsNone(
res['policy_target_group']['network_service_policy_id'])
def _test_create_and_show_application_policy_group(self):
name = "apg1"
@ -596,8 +596,8 @@ class TestGroupResources(GroupPolicyDbTestCase):
data = {'policy_target_group': {'application_policy_group_id': None}}
req = self.new_update_request('policy_target_groups', data, ptg2['id'])
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(
None, res['policy_target_group']['application_policy_group_id'])
self.assertIsNone(
res['policy_target_group']['application_policy_group_id'])
apg = self._show_resource(apg_id, 'application_policy_groups')[
'application_policy_group']

@ -419,7 +419,7 @@ class NFPDBTestCase(SqlTestCase):
'monitoring_port_network']
for arg in non_mandatory_args:
self.assertIsNone(nf_device[arg])
self.assertEqual(None, nf_device['mgmt_port_id'])
self.assertIsNone(nf_device['mgmt_port_id'])
def test_get_network_function_device(self):
attrs = {

@ -2413,7 +2413,7 @@ class TestTopology(ApicAimTestCase):
# Verify network is not associated with address_scope.
net = self._show('networks', net_id)['network']
self.assertEqual(None, net['ipv4_address_scope'])
self.assertIsNone(net['ipv4_address_scope'])
# Associate subnetpool with address_scope.
data = {'subnetpool': {'address_scope_id': scope_id}}
@ -2429,7 +2429,7 @@ class TestTopology(ApicAimTestCase):
# Verify network is not associated with address_scope.
net = self._show('networks', net_id)['network']
self.assertEqual(None, net['ipv4_address_scope'])
self.assertIsNone(net['ipv4_address_scope'])
# Create router and add subnet.
router_id = self._make_router(
@ -3380,7 +3380,7 @@ class TestExternalConnectivityBase(object):
sub = self._make_subnet(
self.fmt, {'network': net}, '10.10.1.1', '10.10.1.0/24')['subnet']
port_calls = []
for x in xrange(0, 2):
for x in range(0, 2):
with self.port(subnet={'subnet': sub}) as p:
p = self._bind_port_to_host(p['port']['id'], 'host1')['port']
p['dns_name'] = ''
@ -3421,7 +3421,7 @@ class TestExternalConnectivityBase(object):
tenant_id=self.tenant_1)['network']
port_calls = []
subnets = []
for x in xrange(0, 2):
for x in range(0, 2):
sub = self._make_subnet(
self.fmt, {'network': net}, '10.10.%d.1' % x,
'10.10.%d.0/24' % x)
@ -4097,7 +4097,7 @@ class TestPortVlanNetwork(ApicAimTestCase):
vlans = []
self._register_agent('h10', AGENT_CONF_OVS)
for x in xrange(0, 2):
for x in range(0, 2):
net = self._make_network(self.fmt, 'net%d' % x, True)['network']
epgs.append(self._net_2_epg(net))

@ -363,8 +363,8 @@ class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase,
self.assertEqual(l3p['subnet_prefix_length'],
int(subpool['default_prefixlen']))
else:
self.assertEqual(None, l3p['ip_pool'])
self.assertEqual(None, l3p['subnet_prefix_length'])
self.assertIsNone(l3p['ip_pool'])
self.assertIsNone(l3p['subnet_prefix_length'])
self.assertEqual(l3p['ip_version'],
subpool['ip_version'])
if compare_subnetpool_shared_attr:
@ -651,7 +651,7 @@ class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase,
expected_in_filters = []
expected_out_filters = []
for idx in xrange(0, len(policy_rules)):
for idx in range(0, len(policy_rules)):
pc = self.show_policy_classifier(
policy_rules[idx]['policy_classifier_id'])['policy_classifier']
fwd_filter = self.name_mapper.policy_rule(None,
@ -810,8 +810,8 @@ class TestL3Policy(AIMBaseTestCase):
new_subnetpools = implicit_subpool + [sp2['id']]
attrs = {'id': l3p['id'], subnetpools_version: new_subnetpools}
l3p = self.update_l3_policy(**attrs)['l3_policy']
self.assertEqual(None, l3p['ip_pool'])
self.assertEqual(None, l3p['subnet_prefix_length'])
self.assertIsNone(l3p['ip_pool'])
self.assertIsNone(l3p['subnet_prefix_length'])
self.assertItemsEqual(new_subnetpools, l3p[subnetpools_version])
attrs = {'id': l3p['id'], subnetpools_version: implicit_subpool}
l3p = self.update_l3_policy(**attrs)['l3_policy']

@ -4556,7 +4556,7 @@ class TestExternalSegment(ApicMappingTestCase):
else [es['tenant_id']])
l3p_list = []
for x in xrange(len(tenants)):
for x in range(len(tenants)):
l3p = self.create_l3_policy(
shared=False,
tenant_id=tenants[x],
@ -4569,7 +4569,7 @@ class TestExternalSegment(ApicMappingTestCase):
eps = [f(external_segments=[es['id']],
tenant_id=tenants[x],
expected_res_status=201)['external_policy']
for x in xrange(len(tenants))]
for x in range(len(tenants))]
mgr = self.driver.apic_manager
owner = self._tenant(es['tenant_id'], shared_es)
mgr.ensure_external_epg_created.reset_mock()
@ -4715,7 +4715,7 @@ class TestExternalSegment(ApicMappingTestCase):
# create L3-policies
l3p_list = []
for x in xrange(len(tenants)):
for x in range(len(tenants)):
l3p = self.create_l3_policy(
shared=False,
tenant_id=tenants[x],
@ -4728,7 +4728,7 @@ class TestExternalSegment(ApicMappingTestCase):
eps = [f(external_segments=[es['id']],
tenant_id=tenants[x],
expected_res_status=201)['external_policy']
for x in xrange(len(tenants))]
for x in range(len(tenants))]
mgr = self.driver.apic_manager
mgr.ensure_static_route_created.reset_mock()
mgr.ensure_external_epg_created.reset_mock()
@ -5010,7 +5010,7 @@ class TestExternalSegmentPreL3Out(TestExternalSegment):
mgr.apic.l3extOut.get_subtree.reset_mock()
mgr.apic.l3extOut.get_subtree.return_value = []
info = self.driver._query_l3out_info('l3out', 'bar_tenant')
self.assertEqual(None, info)
self.assertIsNone(info)
expected_calls = [
mock.call('bar_tenant', 'l3out'),
mock.call('common', 'l3out')]
@ -5202,7 +5202,7 @@ class TestExternalPolicy(ApicMappingTestCase):
'nexthop': '192.168.0.254'}])['external_segment']
for x in range(3)]
l3p_list = []
for x in xrange(len(es_list)):
for x in range(len(es_list)):
l3p = self.create_l3_policy(
shared=False,
tenant_id=shared_es and 'another' or es_list[x]['tenant_id'],
@ -5293,7 +5293,7 @@ class TestExternalPolicy(ApicMappingTestCase):
'nexthop': '192.168.0.254'}])['external_segment']
for x in range(3)]
l3p_list = []
for x in xrange(len(es_list)):
for x in range(len(es_list)):
l3p = self.create_l3_policy(
shared=False,
tenant_id=shared_es and 'another' or es_list[x]['tenant_id'],
@ -5424,7 +5424,7 @@ class TestExternalPolicy(ApicMappingTestCase):
'nexthop': '192.168.0.254'}])['external_segment']
for x in range(3)]
l3p_list = []
for x in xrange(len(es_list)):
for x in range(len(es_list)):
l3p = self.create_l3_policy(
shared=False,
tenant_id=shared_es and 'another' or es_list[x]['tenant_id'],
@ -5530,7 +5530,7 @@ class TestExternalPolicy(ApicMappingTestCase):
'nexthop': '192.168.0.254'}])['external_segment']
for x in range(3)]
l3p_list = []
for x in xrange(len(es_list)):
for x in range(len(es_list)):
l3p = self.create_l3_policy(
shared=False,
tenant_id=shared_es and 'another' or es_list[x]['tenant_id'],
@ -5734,7 +5734,7 @@ class TestExternalPolicy(ApicMappingTestCase):
for x in range(2)]
l3p_list = []
for x in xrange(len(tenants)):
for x in range(len(tenants)):
l3p = self.create_l3_policy(
shared=False,
tenant_id=tenants[x],

@ -95,7 +95,7 @@ class NodeCompositionPluginTestMixin(object):
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
node_ids = []
for x in xrange(number_of_nodes):
for x in range(number_of_nodes):
node_ids.append(self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_LB_CONFIG,

@ -233,7 +233,7 @@ class NFPNodeDriverTestCase(
vendor=self.SERVICE_PROFILE_VENDOR,
insertion_mode='l3', service_flavor='vyos')['service_profile']
node_ids = []
for x in xrange(number_of_nodes):
for x in range(number_of_nodes):
node_ids.append(self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_FW_CONFIG,

@ -19,6 +19,7 @@ import subprocess
from subprocess import CalledProcessError
import time
from gbpservice._i18n import _
from gbpservice.nfp.pecan import base_controller
LOG = logging.getLogger(__name__)
@ -190,7 +191,7 @@ class Controller(base_controller.BaseController):
msg4 = ("requests successfull for data: %s" % body)
LOG.info(msg4)
else:
raise Exception('VM is not reachable')
raise Exception(_('VM is not reachable'))
cache_ips.add(device_ip)
else:
if (resource in NFP_SERVICE_LIST):

@ -14,6 +14,7 @@ import six
from argparse import Namespace
from gbpservice._i18n import _
from gbpservice.nfp.core import context
from gbpservice.nfp.core import log as nfp_logging
from gbpservice.nfp.core import threadpool as core_tp
@ -37,7 +38,7 @@ def check_in_use(f):
def wrapped(self, *args, **kwargs):
if self.fired:
raise InUse("Executor in use")
raise InUse(_("Executor in use"))
return f(self, *args, **kwargs)
return wrapped

@ -20,6 +20,7 @@ import socket
from oslo_serialization import jsonutils
from gbpservice._i18n import _
from gbpservice.nfp.core import log as nfp_logging
LOG = nfp_logging.getLogger(__name__)
@ -49,7 +50,7 @@ class UnixHTTPConnection(httplib.HTTPConnection):
self.sock.connect(self.socket_path)
except socket.error as exc:
raise RestClientException(
"Caught exception socket.error : %s" % exc)
_("Caught exception socket.error : %s") % exc)
class UnixRestClient(object):
@ -66,10 +67,10 @@ class UnixRestClient(object):
return resp, content
except httplib2.ServerNotFoundError:
raise RestClientException("Server Not Found")
raise RestClientException(_("Server Not Found"))
except exceptions.Exception as e:
raise RestClientException("httplib response error %s" % (e))
raise RestClientException(_("httplib response error %s") % (e))
def send_request(self, path, method_type, request_method='http',
server_addr='127.0.0.1',
@ -109,33 +110,34 @@ class UnixRestClient(object):
if success_code.__contains__(resp.status):
return resp, content
elif resp.status == 400:
raise RestClientException("HTTPBadRequest: %s" % resp.reason)
raise RestClientException(_("HTTPBadRequest: %s") % resp.reason)
elif resp.status == 401:
raise RestClientException("HTTPUnauthorized: %s" % resp.reason)
raise RestClientException(_("HTTPUnauthorized: %s") % resp.reason)
elif resp.status == 403:
raise RestClientException("HTTPForbidden: %s" % resp.reason)
raise RestClientException(_("HTTPForbidden: %s") % resp.reason)
elif resp.status == 404:
raise RestClientException("HttpNotFound: %s" % resp.reason)
raise RestClientException(_("HttpNotFound: %s") % resp.reason)
elif resp.status == 405:
raise RestClientException(
"HTTPMethodNotAllowed: %s" % resp.reason)
_("HTTPMethodNotAllowed: %s") % resp.reason)
elif resp.status == 406:
raise RestClientException("HTTPNotAcceptable: %s" % resp.reason)
raise RestClientException(_("HTTPNotAcceptable: %s") % resp.reason)
elif resp.status == 408:
raise RestClientException("HTTPRequestTimeout: %s" % resp.reason)
raise RestClientException(
_("HTTPRequestTimeout: %s") % resp.reason)
elif resp.status == 409:
raise RestClientException("HTTPConflict: %s" % resp.reason)
raise RestClientException(_("HTTPConflict: %s") % resp.reason)
elif resp.status == 415:
raise RestClientException(
"HTTPUnsupportedMediaType: %s" % resp.reason)
_("HTTPUnsupportedMediaType: %s") % resp.reason)
elif resp.status == 417:
raise RestClientException(
"HTTPExpectationFailed: %s" % resp.reason)
_("HTTPExpectationFailed: %s") % resp.reason)
elif resp.status == 500:
raise RestClientException("HTTPServerError: %s" % resp.reason)
raise RestClientException(_("HTTPServerError: %s") % resp.reason)
else:
raise Exception('Unhandled Exception code: %s %s' % (resp.status,
resp.reason))
raise Exception(_('Unhandled Exception code: %(st)s %(reason)s') %
{'st': resp.status, 'reason': resp.reason})
def get(path):

@ -13,6 +13,7 @@
import exceptions
from gbpservice._i18n import _
from gbpservice.nfp.common import constants as nfp_constants
from gbpservice.nfp.core import log as nfp_logging
from gbpservice.nfp.lib import rest_client_over_unix as unix_rc
@ -75,33 +76,36 @@ class RestApi(object):
if success_code.__contains__(resp.status_code):
return resp
elif resp.status_code == 400:
raise RestClientException("HTTPBadRequest: %s" % resp.reason)
raise RestClientException(_("HTTPBadRequest: %s") % resp.reason)
elif resp.status_code == 401:
raise RestClientException("HTTPUnauthorized: %s" % resp.reason)
raise RestClientException(_("HTTPUnauthorized: %s") % resp.reason)
elif resp.status_code == 403:
raise RestClientException("HTTPForbidden: %s" % resp.reason)
raise RestClientException(_("HTTPForbidden: %s") % resp.reason)
elif resp.status_code == 404:
raise RestClientException("HttpNotFound: %s" % resp.reason)
raise RestClientException(_("HttpNotFound: %s") % resp.reason)
elif resp.status_code == 405:
raise RestClientException(
"HTTPMethodNotAllowed: %s" % resp.reason)
_("HTTPMethodNotAllowed: %s") % resp.reason)
elif resp.status_code == 406:
raise RestClientException("HTTPNotAcceptable: %s" % resp.reason)
raise RestClientException(_("HTTPNotAcceptable: %s") % resp.reason)
elif resp.status_code == 408:
raise RestClientException("HTTPRequestTimeout: %s" % resp.reason)
raise RestClientException(
_("HTTPRequestTimeout: %s") % resp.reason)
elif resp.status_code == 409:
raise RestClientException("HTTPConflict: %s" % resp.reason)
raise RestClientException(_("HTTPConflict: %s") % resp.reason)
elif resp.status_code == 415:
raise RestClientException(
"HTTPUnsupportedMediaType: %s" % resp.reason)
_("HTTPUnsupportedMediaType: %s") % resp.reason)
elif resp.status_code == 417:
raise RestClientException(
"HTTPExpectationFailed: %s" % resp.reason)
_("HTTPExpectationFailed: %s") % resp.reason)
elif resp.status_code == 500:
raise RestClientException("HTTPServerError: %s" % resp.reason)
raise RestClientException(_("HTTPServerError: %s") % resp.reason)
else:
raise RestClientException('Unhandled Exception code: %s %s' %
(resp.status_code, resp.reason))
raise RestClientException(_('Unhandled Exception code: '
'%(status)s %(reason)s') %
{'status': resp.status_code,
'reason': resp.reason})
return resp
def post(self, path, body, method_type):

@ -13,6 +13,7 @@
import oslo_messaging as messaging
from gbpservice._i18n import _
from gbpservice._i18n import _LE
from gbpservice._i18n import _LI
from gbpservice.nfp.common import constants as nfp_constants
@ -311,7 +312,7 @@ class DeviceOrchestrator(nfp_api.NfpEventHandler):
self.device_configuration_updated)
}
if event_id not in event_handler_mapping:
raise Exception("Invalid event ID")
raise Exception(_("Invalid event ID"))
else:
return event_handler_mapping[event_id]
@ -1683,7 +1684,7 @@ class ExceptionHandler(object):
ExceptionHandler.perform_periodic_health_check),
}
if event_id not in event_handler_mapping:
raise Exception("Invalid event ID")
raise Exception(_("Invalid event ID"))
else:
return event_handler_mapping[event_id]

@ -16,6 +16,7 @@ from neutron.db import api as db_api
from oslo_log import helpers as log_helpers
import oslo_messaging
from gbpservice._i18n import _
from gbpservice._i18n import _LE
from gbpservice._i18n import _LI
from gbpservice._i18n import _LW
@ -572,7 +573,7 @@ class ServiceOrchestrator(nfp_api.NfpEventHandler):
"DELETE_NETWORK_FUNCTION_DB": self.delete_network_function_db
}
if event_id not in event_handler_mapping:
raise Exception("Invalid Event ID")
raise Exception(_("Invalid Event ID"))
else:
return event_handler_mapping[event_id]
@ -890,8 +891,8 @@ class ServiceOrchestrator(nfp_api.NfpEventHandler):
def _validate_service_vendor(self, service_vendor):
if (service_vendor not in self.conf.orchestrator.supported_vendors):
raise Exception(
"The NFP Node driver does not support this service "
"profile with the service vendor %s." % service_vendor)
_("The NFP Node driver does not support this service "
"profile with the service vendor %s.") % service_vendor)
def create_network_function(self, context, network_function_info):
self._validate_create_service_input(context, network_function_info)
@ -1631,9 +1632,9 @@ class ServiceOrchestrator(nfp_api.NfpEventHandler):
service_vendor = service_details.get('service_vendor')
if (not service_vendor or
not service_details.get('device_type')):
raise Exception("service_vendor or device_type not provided in "
raise Exception(_("service_vendor or device_type not provided in "
"service profile's service flavor field."
"Provided service profile: %s" % service_profile)
"Provided service profile: %s") % service_profile)
self._validate_service_vendor(service_vendor.lower())
@nfp_api.poll_event_desc(
@ -2729,7 +2730,7 @@ class ExceptionHandler(object):
ExceptionHandler.handle_delete_network_function_db_exception),
}
if event_id not in event_handler_mapping:
raise Exception("Invalid Event ID")
raise Exception(_("Invalid Event ID"))
else:
return event_handler_mapping[event_id]

@ -20,6 +20,7 @@ from neutronclient.v2_0 import client as neutron_client
from novaclient import client as nova_client
from novaclient import exceptions as nova_exc
from gbpservice._i18n import _
from gbpservice.nfp.core import log as nfp_logging
LOG = nfp_logging.getLogger(__name__)
@ -245,8 +246,9 @@ class NovaClient(OpenstackApi):
instance = nova.servers.get(instance_id)
if instance:
return instance.to_dict()
raise Exception("No instance with id %s found in db for tenant %s"
% (instance_id, tenant_id))
raise Exception(_("No instance with id %(id)s "
"found in db for tenant %(tenant)s")
% {'id': instance_id, 'tenant': tenant_id})
except Exception as ex:
err = ("Failed to read instance information from"
" Openstack Nova service's response"
@ -848,8 +850,11 @@ class NeutronClient(OpenstackApi):
endpoint_url=self.network_service)
return neutron.create_port(body=attr)['port']
except Exception as ex:
raise Exception("Port creation failed in network: %r of tenant: %r"
" Error: %s" % (net_id, tenant_id, ex))
raise Exception(_("Port creation failed in network: %(net)r "
"of tenant: %(tenant)r Error: %(error)s") %
{'net': net_id,
'tenant': tenant_id,
'error': ex})
def delete_port(self, token, port_id):
"""

@ -13,6 +13,7 @@
import eventlet
eventlet.monkey_patch()
from gbpservice._i18n import _
from gbpservice.nfp.core import log as nfp_logging
import os
from oslo_config import cfg as oslo_config
@ -153,10 +154,14 @@ class Connection(object):
if self._idle_count > self._idle_count_max:
self._end_time = time.time()
raise ConnectionIdleTimeOut(
"Connection (%d) - stime (%s) - etime (%s) - "
"idle_count (%d) idle_count_max(%d)" % (
self.identify(), self._start_time,
self._end_time, self._idle_count, self._idle_count_max))
_("Connection (%(conn)d) - "
"stime (%(start_time)s) - etime (%(end_time)s) - "
"idle_count (%(idle)d) idle_count_max(%(idle_max)d)") %
{'conn': self.identify(),
'start_time': self._start_time,
'end_time': self._end_time,
'idle': self._idle_count,
'idle_max': self._idle_count_max})
def idle(self):
self._tick()

10
tox.ini

@ -82,17 +82,13 @@ commands = python setup.py build_sphinx
# H401 docstring should not start with a space
# H402 one line docstring needs punctuation
# H405 multi line docstring summary not separated with an empty line
# H904 Wrap long lines in parentheses instead of a backslash
# TODO(marun) H404 multi line docstring should start with a summary
# N324 Prevent use of deprecated contextlib.nested
# N325 Python 3: Do not use xrange
# N326 Python 3: do not use basestring
# N327 Python 3: do not use dict.iteritems
ignore = E125,E126,E128,E129,E251,E265,E713,F402,F811,F812,H104,H237,H305,H307,H401,H402,H404,H405,H904,N324,N325,N326,N327,N341,N343
# N530 direct neutron imports not allowed
ignore = E125,E126,E128,E129,E251,E265,E713,F402,F811,F812,H104,H237,H305,H307,H401,H402,H404,H405,N530
show-source = true
builtins = _
exclude = .venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build,tools,.ropeproject,rally-scenarios,
[hacking]
import_exceptions = neutron.openstack.common.gettextutils
local-check-factory = neutron.hacking.checks.factory
local-check-factory = neutron_lib.hacking.checks.factory