diff --git a/dragonflow/controller/df_local_controller.py b/dragonflow/controller/df_local_controller.py index 92e27264a..908d26d24 100644 --- a/dragonflow/controller/df_local_controller.py +++ b/dragonflow/controller/df_local_controller.py @@ -21,7 +21,7 @@ from neutron.agent.common import config from neutron.common import config as common_config from oslo_config import cfg from oslo_log import log -from ryu.base.app_manager import AppManager +from ryu.base import app_manager from ryu import cfg as ryu_cfg from ryu.ofproto import ofproto_common @@ -29,8 +29,8 @@ from dragonflow._i18n import _LI, _LW, _ from dragonflow.common import common_params from dragonflow.common import constants from dragonflow.common import utils as df_utils -from dragonflow.controller.ryu_base_app import RyuDFAdapter -from dragonflow.controller.topology import Topology +from dragonflow.controller import ryu_base_app +from dragonflow.controller import topology from dragonflow.db import api_nb from dragonflow.db import db_store from dragonflow.db.drivers import ovsdb_vswitch_impl @@ -72,8 +72,9 @@ class DfLocalController(object): vswitch_api=self.vswitch_api, db_store=self.db_store ) - app_mgr = AppManager.get_instance() - self.open_flow_app = app_mgr.instantiate(RyuDFAdapter, **kwargs) + app_mgr = app_manager.AppManager.get_instance() + self.open_flow_app = app_mgr.instantiate(ryu_base_app.RyuDFAdapter, + **kwargs) self.topology = None self.enable_selective_topo_dist = \ @@ -84,7 +85,8 @@ class DfLocalController(object): self.nb_api.initialize(db_ip=cfg.CONF.df.remote_db_ip, db_port=cfg.CONF.df.remote_db_port) self.vswitch_api.initialize() - self.topology = Topology(self, self.enable_selective_topo_dist) + self.topology = topology.Topology(self, + self.enable_selective_topo_dist) self.vswitch_api.sync() # both set_controller and del_controller will delete flows. diff --git a/dragonflow/controller/df_publisher_service.py b/dragonflow/controller/df_publisher_service.py index d965e2f5b..b0ce5c639 100644 --- a/dragonflow/controller/df_publisher_service.py +++ b/dragonflow/controller/df_publisher_service.py @@ -11,7 +11,7 @@ # under the License. import eventlet -from eventlet.queue import Queue +from eventlet import queue import sys import time import traceback @@ -35,7 +35,7 @@ LOG = logging.getLogger(__name__) class PublisherService(object): def __init__(self): - self._queue = Queue() + self._queue = queue.Queue() self.publisher = self._get_publisher() self.multiproc_subscriber = self._get_multiproc_subscriber() self.db = df_utils.load_driver( diff --git a/dragonflow/controller/dhcp_app.py b/dragonflow/controller/dhcp_app.py index f440df116..a68c9d689 100644 --- a/dragonflow/controller/dhcp_app.py +++ b/dragonflow/controller/dhcp_app.py @@ -31,7 +31,7 @@ from ryu.ofproto import ether from dragonflow.common import utils as df_utils from dragonflow._i18n import _, _LI, _LE, _LW from dragonflow.controller.common import constants as const -from dragonflow.controller.df_base_app import DFlowApp +from dragonflow.controller import df_base_app DF_DHCP_OPTS = [ cfg.ListOpt('df_dns_servers', @@ -59,7 +59,7 @@ DHCP_ACK = 5 DHCP_CLASSLESS_ROUTE = 121 -class DHCPApp(DFlowApp): +class DHCPApp(df_base_app.DFlowApp): def __init__(self, *args, **kwargs): super(DHCPApp, self).__init__(*args, **kwargs) self.idle_timeout = 30 diff --git a/dragonflow/controller/dnat_app.py b/dragonflow/controller/dnat_app.py index bf44bf5eb..340cedf90 100644 --- a/dragonflow/controller/dnat_app.py +++ b/dragonflow/controller/dnat_app.py @@ -25,10 +25,10 @@ from ryu.ofproto import ether import six from dragonflow._i18n import _ -from dragonflow.controller.common.arp_responder import ArpResponder +from dragonflow.controller.common import arp_responder from dragonflow.controller.common import constants as const from dragonflow.controller.common import utils -from dragonflow.controller.df_base_app import DFlowApp +from dragonflow.controller import df_base_app DF_DNAT_APP_OPTS = [ @@ -48,7 +48,7 @@ DF_DNAT_APP_OPTS = [ FIP_GW_RESOLVING_STATUS = 'resolving' -class DNATApp(DFlowApp): +class DNATApp(df_base_app.DFlowApp): def __init__(self, *args, **kwargs): super(DNATApp, self).__init__(*args, **kwargs) @@ -154,7 +154,7 @@ class DNATApp(DFlowApp): # install floatingip arp responder flow rules if netaddr.IPAddress(floatingip.get_ip_address()).version != 4: return - ArpResponder(self.get_datapath(), + arp_responder.ArpResponder(self.get_datapath(), None, floatingip.get_ip_address(), floatingip.get_mac_address(), @@ -164,7 +164,7 @@ class DNATApp(DFlowApp): # install floatingip arp responder flow rules if netaddr.IPAddress(floatingip.get_ip_address()).version != 4: return - ArpResponder(self.get_datapath(), + arp_responder.ArpResponder(self.get_datapath(), None, floatingip.get_ip_address(), floatingip.get_mac_address(), diff --git a/dragonflow/controller/l3_app.py b/dragonflow/controller/l3_app.py index a8c24df8d..a1641b532 100644 --- a/dragonflow/controller/l3_app.py +++ b/dragonflow/controller/l3_app.py @@ -23,15 +23,15 @@ from ryu.lib.packet import packet from ryu.ofproto import ether from dragonflow._i18n import _LE -from dragonflow.controller.common.arp_responder import ArpResponder +from dragonflow.controller.common import arp_responder from dragonflow.controller.common import constants as const -from dragonflow.controller.common.icmp_responder import ICMPResponder -from dragonflow.controller.df_base_app import DFlowApp +from dragonflow.controller.common import icmp_responder +from dragonflow.controller import df_base_app LOG = log.getLogger(__name__) -class L3App(DFlowApp): +class L3App(df_base_app.DFlowApp): def __init__(self, *args, **kwargs): super(L3App, self).__init__(*args, **kwargs) self.idle_timeout = 30 @@ -150,8 +150,9 @@ class L3App(DFlowApp): # Add router ARP & ICMP responder for IPv4 Addresses is_ipv4 = netaddr.IPAddress(dst_ip).version == 4 if is_ipv4: - ArpResponder(datapath, local_network_id, dst_ip, mac).add() - ICMPResponder(datapath, dst_ip, mac).add() + arp_responder.ArpResponder( + datapath, local_network_id, dst_ip, mac).add() + icmp_responder.ICMPResponder(datapath, dst_ip, mac).add() # If router interface IP, send to output table if is_ipv4: @@ -273,8 +274,9 @@ class L3App(DFlowApp): mac = router_port.get_mac() if netaddr.IPAddress(ip).version == 4: - ArpResponder(self.get_datapath(), local_network_id, ip).remove() - ICMPResponder(self.get_datapath(), ip, mac).remove() + arp_responder.ArpResponder( + self.get_datapath(), local_network_id, ip).remove() + icmp_responder.ICMPResponder(self.get_datapath(), ip, mac).remove() match = parser.OFPMatch() match.set_metadata(local_network_id) diff --git a/dragonflow/controller/l3_proactive_app.py b/dragonflow/controller/l3_proactive_app.py index 26a93fe23..cd979999e 100644 --- a/dragonflow/controller/l3_proactive_app.py +++ b/dragonflow/controller/l3_proactive_app.py @@ -16,13 +16,13 @@ from neutron_lib import constants as common_const from ryu.lib.mac import haddr_to_bin from ryu.ofproto import ether -from dragonflow.controller.common.arp_responder import ArpResponder +from dragonflow.controller.common import arp_responder from dragonflow.controller.common import constants as const -from dragonflow.controller.common.icmp_responder import ICMPResponder -from dragonflow.controller.df_base_app import DFlowApp +from dragonflow.controller.common import icmp_responder +from dragonflow.controller import df_base_app -class L3ProactiveApp(DFlowApp): +class L3ProactiveApp(df_base_app.DFlowApp): def __init__(self, *args, **kwargs): super(L3ProactiveApp, self).__init__(*args, **kwargs) @@ -44,8 +44,10 @@ class L3ProactiveApp(DFlowApp): # Add router ARP & ICMP responder for IPv4 Addresses is_ipv4 = netaddr.IPAddress(dst_ip).version == 4 if is_ipv4: - ArpResponder(datapath, local_network_id, dst_ip, mac).add() - ICMPResponder(datapath, dst_ip, mac).add() + arp_responder.ArpResponder(datapath, + local_network_id, + dst_ip, mac).add() + icmp_responder.ICMPResponder(datapath, dst_ip, mac).add() # If router interface IP, send to output table if is_ipv4: @@ -184,8 +186,9 @@ class L3ProactiveApp(DFlowApp): mac = router_port.get_mac() if netaddr.IPAddress(ip).version == 4: - ArpResponder(self.get_datapath(), local_network_id, ip).remove() - ICMPResponder(self.get_datapath(), ip, mac).remove() + arp_responder.ArpResponder(self.get_datapath(), + local_network_id, ip).remove() + icmp_responder.ICMPResponder(self.get_datapath(), ip, mac).remove() match = parser.OFPMatch() match.set_metadata(local_network_id) diff --git a/dragonflow/controller/metadata_service_app.py b/dragonflow/controller/metadata_service_app.py index 8d2c0604e..3265cc44f 100644 --- a/dragonflow/controller/metadata_service_app.py +++ b/dragonflow/controller/metadata_service_app.py @@ -29,12 +29,11 @@ from oslo_utils import encodeutils from neutron.agent.ovsdb.native import idlutils from dragonflow._i18n import _, _LW, _LE -from dragonflow.common.exceptions import LogicalPortNotFoundByTunnelKey -from dragonflow.common.exceptions import NoRemoteIPProxyException +from dragonflow.common import exceptions from dragonflow.common import utils as df_utils -from dragonflow.controller.common.arp_responder import ArpResponder +from dragonflow.controller.common import arp_responder from dragonflow.controller.common import constants as const -from dragonflow.controller.df_base_app import DFlowApp +from dragonflow.controller import df_base_app from dragonflow.db import api_nb from ryu.lib.packet import arp @@ -72,7 +71,7 @@ options = [ ] -class MetadataServiceApp(DFlowApp): +class MetadataServiceApp(df_base_app.DFlowApp): def __init__(self, api, db_store=None, vswitch_api=None, nb_api=None): super(MetadataServiceApp, self).__init__( api, @@ -394,7 +393,7 @@ class MetadataServiceApp(DFlowApp): ] def _create_arp_responder(self, mac): - self._arp_responder = ArpResponder( + self._arp_responder = arp_responder.ArpResponder( self.get_datapath(), None, const.METADATA_SERVICE_IP, @@ -500,7 +499,7 @@ class DFMetadataProxyHandler(BaseMetadataProxyHandler): def get_headers(self, req): remote_addr = req.remote_addr if not remote_addr: - raise NoRemoteIPProxyException() + raise exceptions.NoRemoteIPProxyException() tunnel_key = int(netaddr.IPAddress(remote_addr) & ~0x80000000) lport = self._get_logical_port_by_tunnel_key(tunnel_key) headers = dict(req.headers) @@ -540,7 +539,7 @@ class DFMetadataProxyHandler(BaseMetadataProxyHandler): for lport in lports: if lport.get_tunnel_key() == tunnel_key: return lport - raise LogicalPortNotFoundByTunnelKey(key=tunnel_key) + raise exceptions.LogicalPortNotFoundByTunnelKey(key=tunnel_key) # Taken from Neurton: neutron/agent/metadata/agent.py def _sign_instance_id(self, instance_id): diff --git a/dragonflow/controller/ofswitch.py b/dragonflow/controller/ofswitch.py index f66308483..eff3e0fb3 100644 --- a/dragonflow/controller/ofswitch.py +++ b/dragonflow/controller/ofswitch.py @@ -16,8 +16,8 @@ import eventlet from oslo_log import log import ryu.app.ofctl.api as ofctl_api -from ryu.app.ofctl.service import OfctlService -from ryu.base.app_manager import AppManager +from ryu.app.ofctl import service +from ryu.base import app_manager import ryu.exception as ryu_exc from dragonflow._i18n import _LE @@ -32,8 +32,8 @@ class OpenFlowSwitchMixin(object): """ def __init__(self, ryu_app): - app_mgr = AppManager.get_instance() - self.ofctl_app = app_mgr.instantiate(OfctlService) + app_mgr = app_manager.AppManager.get_instance() + self.ofctl_app = app_mgr.instantiate(service.OfctlService) self.ofctl_app.start() self._app = ryu_app diff --git a/dragonflow/controller/portsec_app.py b/dragonflow/controller/portsec_app.py index c115e07ed..0c006bc3c 100644 --- a/dragonflow/controller/portsec_app.py +++ b/dragonflow/controller/portsec_app.py @@ -20,7 +20,7 @@ from ryu.ofproto import ether from dragonflow._i18n import _LI from dragonflow.controller.common import constants as const -from dragonflow.controller.df_base_app import DFlowApp +from dragonflow.controller import df_base_app config.setup_logging() @@ -32,7 +32,7 @@ DHCP_CLIENT_PORT = 68 DHCP_SERVER_PORT = 67 -class PortSecApp(DFlowApp): +class PortSecApp(df_base_app.DFlowApp): def _add_flow_drop(self, priority, match): drop_inst = None diff --git a/dragonflow/db/api_nb.py b/dragonflow/db/api_nb.py index 5538d6488..7f4691178 100644 --- a/dragonflow/db/api_nb.py +++ b/dragonflow/db/api_nb.py @@ -27,8 +27,7 @@ import six from dragonflow._i18n import _LI, _LW, _LE from dragonflow.common import utils as df_utils -from dragonflow.db.db_common import DbUpdate, SEND_ALL_TOPIC, \ - DB_SYNC_MINIMUM_INTERVAL +from dragonflow.db import db_common from dragonflow.db import pub_sub_api LOG = log.getLogger(__name__) @@ -118,7 +117,7 @@ class NbApi(object): def _start_subsciber(self): self.subscriber.initialize(self.db_change_callback) - self.subscriber.register_topic(SEND_ALL_TOPIC) + self.subscriber.register_topic(db_common.SEND_ALL_TOPIC) publishers_ips = cfg.CONF.df.publishers_ips uris = {'%s://%s:%s' % ( cfg.CONF.df.publisher_transport, @@ -138,8 +137,8 @@ class NbApi(object): def _send_db_change_event(self, table, key, action, value, topic): if self.use_pubsub: if not self.enable_selective_topo_dist: - topic = SEND_ALL_TOPIC - update = DbUpdate(table, key, action, value, topic=topic) + topic = db_common.SEND_ALL_TOPIC + update = db_common.DbUpdate(table, key, action, value, topic=topic) self.publisher.send_event(update) eventlet.sleep(0) @@ -155,14 +154,14 @@ class NbApi(object): self._read_db_changes_from_queue() def db_change_callback(self, table, key, action, value, topic=None): - update = DbUpdate(table, key, action, value, topic=topic) + update = db_common.DbUpdate(table, key, action, value, topic=topic) LOG.info(_LI("Pushing Update to Queue: %s"), update) self._queue.put(update) eventlet.sleep(0) def _read_db_changes_from_queue(self): sync_rate_limiter = df_utils.RateLimiter( - max_rate=1, time_unit=DB_SYNC_MINIMUM_INTERVAL) + max_rate=1, time_unit=db_common.DB_SYNC_MINIMUM_INTERVAL) while True: self.next_update = self._queue.get(block=True) LOG.debug("Event update: %s", self.next_update) diff --git a/dragonflow/db/drivers/etcd_db_driver.py b/dragonflow/db/drivers/etcd_db_driver.py index 9f70289a0..d40fc9efe 100644 --- a/dragonflow/db/drivers/etcd_db_driver.py +++ b/dragonflow/db/drivers/etcd_db_driver.py @@ -17,8 +17,8 @@ import etcd import eventlet from oslo_log import log import urllib3 -from urllib3.connection import HTTPException, BaseSSLError -from urllib3.exceptions import ReadTimeoutError, ProtocolError +from urllib3 import connection +from urllib3 import exceptions from dragonflow.common import exceptions as df_exceptions from dragonflow.db import db_api @@ -47,20 +47,22 @@ def _error_catcher(self): # FIXME: Ideally we'd like to include the url in the # ReadTimeoutError but there is yet no clean way to # get at it from this context. - raise ReadTimeoutError(self._pool, None, 'Read timed out.') + raise exceptions.ReadTimeoutError( + self._pool, None, 'Read timed out.') - except BaseSSLError as e: + except connection.BaseSSLError as e: # FIXME: Is there a better way to differentiate between SSLErrors? if 'read operation timed out' not in str(e): # Defensive: # This shouldn't happen but just in case we're missing an edge # case, let's avoid swallowing SSL errors. raise - raise ReadTimeoutError(self._pool, None, 'Read timed out.') + raise exceptions.ReadTimeoutError( + self._pool, None, 'Read timed out.') - except HTTPException as e: + except connection.HTTPException as e: # This includes IncompleteRead. - raise ProtocolError('Connection broken: %r' % e, e) + raise exceptions.ProtocolError('Connection broken: %r' % e, e) except Exception: # The response may not be closed but we're not going to use it anymore # so close it now to ensure that the connection is released back to the diff --git a/dragonflow/db/drivers/ovsdb_vswitch_impl.py b/dragonflow/db/drivers/ovsdb_vswitch_impl.py index 5f1a091d0..c55ac71f9 100644 --- a/dragonflow/db/drivers/ovsdb_vswitch_impl.py +++ b/dragonflow/db/drivers/ovsdb_vswitch_impl.py @@ -16,14 +16,13 @@ from neutron.agent.ovsdb import impl_idl from neutron.agent.ovsdb.native import commands -from neutron.agent.ovsdb.native.commands import BaseCommand from neutron.agent.ovsdb.native import connection from neutron.agent.ovsdb.native import helpers from neutron.agent.ovsdb.native import idlutils from oslo_config import cfg from ovs.db import idl from ovs import poller -from ovs.vlog import Vlog +from ovs import vlog import retrying import six import threading @@ -128,7 +127,7 @@ class OvsdbSwitchApi(api_vswitch.SwitchApi): self.nb_api = nb_api self.ovsdb_monitor = None self.integration_bridge = cfg.CONF.df.integration_bridge - Vlog.init('dragonflow') + vlog.Vlog.init('dragonflow') def initialize(self): db_connection = ('%s:%s:%s' % (self.protocol, self.ip, self.port)) @@ -314,7 +313,7 @@ class OvsdbTunnelPort(OvsdbSwitchPort): return self.chassis_id -class DelControllerCommand(BaseCommand): +class DelControllerCommand(commands.BaseCommand): def __init__(self, api, bridge): super(DelControllerCommand, self).__init__(api) self.bridge = bridge @@ -324,7 +323,7 @@ class DelControllerCommand(BaseCommand): br.controller = [] -class SetControllerCommand(BaseCommand): +class SetControllerCommand(commands.BaseCommand): def __init__(self, api, bridge, targets): super(SetControllerCommand, self).__init__(api) self.bridge = bridge @@ -341,7 +340,7 @@ class SetControllerCommand(BaseCommand): br.controller = controllers -class SetControllerFailModeCommand(BaseCommand): +class SetControllerFailModeCommand(commands.BaseCommand): def __init__(self, api, bridge, fail_mode): super(SetControllerFailModeCommand, self).__init__(api) self.bridge = bridge @@ -353,7 +352,7 @@ class SetControllerFailModeCommand(BaseCommand): br.fail_mode = [self.fail_mode] -class DeleteSwitchPort(BaseCommand): +class DeleteSwitchPort(commands.BaseCommand): def __init__(self, api, switch_port): super(DeleteSwitchPort, self).__init__(api) self.switch_port = switch_port @@ -376,7 +375,7 @@ class DeleteSwitchPort(BaseCommand): self.api.idl.tables['Port'].rows[port.uuid].delete() -class AddTunnelPort(BaseCommand): +class AddTunnelPort(commands.BaseCommand): def __init__(self, api, chassis): super(AddTunnelPort, self).__init__(api) self.chassis = chassis @@ -450,7 +449,7 @@ class OvsdbMonitor(object): self._notify_update_local_interface(_interface, action) -class AddPatchPort(BaseCommand): +class AddPatchPort(commands.BaseCommand): def __init__(self, api, bridge, port, remote_name): super(AddPatchPort, self).__init__(api) self.bridge = bridge diff --git a/dragonflow/db/drivers/ramcloud.py b/dragonflow/db/drivers/ramcloud.py index 37959cde0..09f241fe7 100644 --- a/dragonflow/db/drivers/ramcloud.py +++ b/dragonflow/db/drivers/ramcloud.py @@ -29,7 +29,6 @@ # installing it import ctypes -from ctypes.util import find_library import os @@ -67,7 +66,7 @@ def get_library_path(): path = test_path break if not path: - path = find_library('ramcloud') + path = ctypes.util.find_library('ramcloud') return path @@ -115,8 +114,6 @@ def load_so(): ctypes.c_bool = c_bool_compat - from ctypes import POINTER - # argument types aliased to their names for sanity # alphabetical order address = ctypes.c_char_p @@ -133,14 +130,14 @@ def load_so(): keysOnly = ctypes.c_uint32 name = ctypes.c_char_p nanoseconds = ctypes.c_uint64 - rejectRules = POINTER(RejectRules) + rejectRules = ctypes.POINTER(RejectRules) serviceLocator = ctypes.c_char_p status = ctypes.c_int table = ctypes.c_uint64 version = ctypes.c_uint64 serverId = ctypes.c_uint64 - so.rc_connect.argtypes = [address, address, POINTER(client)] + so.rc_connect.argtypes = [address, address, ctypes.POINTER(client)] so.rc_connect.restype = status so.rc_disconnect.argtypes = [client] @@ -155,31 +152,34 @@ def load_so(): so.rc_getStatus.argtypes = [] so.rc_getStatus.restype = status - so.rc_getTableId.argtypes = [client, name, POINTER(table)] + so.rc_getTableId.argtypes = [client, name, ctypes.POINTER(table)] so.rc_getTableId.restype = status so.rc_enumerateTablePrepare.argtypes = [client, table, keysOnly, - POINTER(enumerationState)] + ctypes.POINTER(enumerationState)] so.rc_enumerateTablePrepare.restype = None so.rc_enumerateTableNext.argtypes = [client, enumerationState, - POINTER(keyLen), POINTER(enum_key), - POINTER(dataLength), POINTER(data)] + ctypes.POINTER(keyLen), + ctypes.POINTER(enum_key), + ctypes.POINTER(dataLength), + ctypes.POINTER(data)] so.rc_enumerateTableNextrestype = status so.rc_enumerateTableFinalize.argtypes = [enumerationState] so.rc_enumerateTableFinalize.restype = None so.rc_read.argtypes = [client, table, key, keyLength, rejectRules, - POINTER(version), buf, len, POINTER(len)] + ctypes.POINTER(version), buf, len, + ctypes.POINTER(len)] so.rc_read.restype = status so.rc_remove.argtypes = [client, table, key, keyLength, rejectRules, - POINTER(version)] + ctypes.POINTER(version)] so.rc_remove.restype = status so.rc_write.argtypes = [client, table, key, keyLength, buf, len, - rejectRules, POINTER(version)] + rejectRules, ctypes.POINTER(version)] so.rc_write.restype = status so.rc_testing_kill.argtypes = [client, table, key, keyLength] @@ -190,7 +190,7 @@ def load_so(): so.rc_testing_fill.restype = status so.rc_testing_get_server_id.argtypes = [client, table, key, keyLength, - POINTER(serverId)] + ctypes.POINTER(serverId)] so.rc_testing_get_server_id.restype = status so.rc_testing_get_service_locator.argtypes = [client, table, key, diff --git a/dragonflow/db/drivers/redis_db_driver.py b/dragonflow/db/drivers/redis_db_driver.py index 296f25722..6cb042f29 100644 --- a/dragonflow/db/drivers/redis_db_driver.py +++ b/dragonflow/db/drivers/redis_db_driver.py @@ -22,7 +22,7 @@ import six from dragonflow._i18n import _LE, _LW from dragonflow.db import db_api -from dragonflow.db.drivers.redis_mgt import RedisMgt +from dragonflow.db.drivers import redis_mgt LOG = log.getLogger(__name__) @@ -39,7 +39,7 @@ class RedisDbDriver(db_api.DbApi): def initialize(self, db_ip, db_port, **args): # get remote ip port list - self.redis_mgt = RedisMgt.get_instance(db_ip, db_port) + self.redis_mgt = redis_mgt.RedisMgt.get_instance(db_ip, db_port) self._update_server_list() def _update_server_list(self): diff --git a/dragonflow/db/drivers/redis_mgt.py b/dragonflow/db/drivers/redis_mgt.py index c83ec9521..16dbae59d 100644 --- a/dragonflow/db/drivers/redis_mgt.py +++ b/dragonflow/db/drivers/redis_mgt.py @@ -21,7 +21,7 @@ import six from dragonflow._i18n import _LI, _LE, _LW from dragonflow.common import utils as df_utils -from dragonflow.db.db_common import DbUpdate +from dragonflow.db import db_common from dragonflow.db.drivers import redis_calckey LOG = log.getLogger(__name__) @@ -356,8 +356,9 @@ class RedisMgt(object): if len(nodes) > 0: if self.publisher is not None: nodes_json = jsonutils.dumps(nodes) - update = DbUpdate('ha', 'nodes', 'set', nodes_json, - topic='redis') + update = db_common.DbUpdate('ha', 'nodes', + 'set', nodes_json, + topic='redis') self.publisher.send_event(update) # process new nodes got diff --git a/dragonflow/db/drivers/zookeeper_db_driver.py b/dragonflow/db/drivers/zookeeper_db_driver.py index 10848ef05..5f139cc6a 100644 --- a/dragonflow/db/drivers/zookeeper_db_driver.py +++ b/dragonflow/db/drivers/zookeeper_db_driver.py @@ -11,9 +11,9 @@ # under the License. import kazoo -from kazoo.client import KazooClient -from kazoo.handlers.eventlet import SequentialEventletHandler -from kazoo.retry import KazooRetry +from kazoo import client +from kazoo.handlers import eventlet +from kazoo import retry import six from dragonflow.common import exceptions as df_exceptions @@ -57,12 +57,12 @@ class ZookeeperDbDriver(db_api.DbApi): def _lazy_initialize(self): if not self.client: hosts = _parse_hosts(self.config.remote_db_hosts) - _handler = SequentialEventletHandler() - _retry = KazooRetry(max_tries=CLIENT_CONNECTION_RETRIES, + _handler = eventlet.SequentialEventletHandler() + _retry = retry.KazooRetry(max_tries=CLIENT_CONNECTION_RETRIES, delay=0.5, backoff=2, sleep_func=_handler.sleep_func) - self.client = KazooClient(hosts=hosts, + self.client = client.KazooClient(hosts=hosts, handler=_handler, connection_retry=_retry) self.client.start() diff --git a/dragonflow/db/pubsub_drivers/redis_db_pubsub_driver.py b/dragonflow/db/pubsub_drivers/redis_db_pubsub_driver.py index 9d26138e9..287038666 100644 --- a/dragonflow/db/pubsub_drivers/redis_db_pubsub_driver.py +++ b/dragonflow/db/pubsub_drivers/redis_db_pubsub_driver.py @@ -20,7 +20,7 @@ import redis from dragonflow._i18n import _LE, _LW from dragonflow.common import common_params -from dragonflow.db.drivers.redis_mgt import RedisMgt +from dragonflow.db.drivers import redis_mgt from dragonflow.db import pub_sub_api LOG = logging.getLogger(__name__) @@ -54,8 +54,9 @@ class RedisPublisherAgent(pub_sub_api.PublisherApi): def initialize(self): # find a publisher server node super(RedisPublisherAgent, self).initialize() - self.redis_mgt = RedisMgt.get_instance(cfg.CONF.df.remote_db_ip, - cfg.CONF.df.remote_db_port) + self.redis_mgt = redis_mgt.RedisMgt.get_instance( + cfg.CONF.df.remote_db_ip, + cfg.CONF.df.remote_db_port) self._update_client() def _update_client(self): @@ -122,8 +123,9 @@ class RedisSubscriberAgent(pub_sub_api.SubscriberAgentBase): def initialize(self, callback): # find a subscriber server node and run daemon super(RedisSubscriberAgent, self).initialize(callback) - self.redis_mgt = RedisMgt.get_instance(cfg.CONF.df.remote_db_ip, - cfg.CONF.df.remote_db_port) + self.redis_mgt = redis_mgt.RedisMgt.get_instance( + cfg.CONF.df.remote_db_ip, + cfg.CONF.df.remote_db_port) self._update_client() def process_ha(self): diff --git a/dragonflow/db/pubsub_drivers/zmq_pubsub_driver.py b/dragonflow/db/pubsub_drivers/zmq_pubsub_driver.py index 1573b1935..a3f3c1ea9 100644 --- a/dragonflow/db/pubsub_drivers/zmq_pubsub_driver.py +++ b/dragonflow/db/pubsub_drivers/zmq_pubsub_driver.py @@ -18,7 +18,7 @@ from oslo_config import cfg from oslo_log import log as logging from dragonflow._i18n import _LI, _LE -from dragonflow.common.exceptions import UnsupportedTransportException +from dragonflow.common import exceptions from dragonflow.db import db_common from dragonflow.db import pub_sub_api @@ -38,7 +38,7 @@ class ZMQPubSub(pub_sub_api.PubSubApi): 'transport': transport, 'expected': str(SUPPORTED_TRANSPORTS) }) - raise UnsupportedTransportException(transport=transport) + raise exceptions.UnsupportedTransportException(transport=transport) self.subscriber = ZMQSubscriberAgent() self.publisher = ZMQPublisherAgent() diff --git a/dragonflow/tests/common/app_testing_objects.py b/dragonflow/tests/common/app_testing_objects.py index a044d7974..25766f3be 100644 --- a/dragonflow/tests/common/app_testing_objects.py +++ b/dragonflow/tests/common/app_testing_objects.py @@ -27,7 +27,7 @@ import ryu.lib.packet from dragonflow._i18n import _LI, _LE from dragonflow.common import common_params -from dragonflow.common.utils import DFDaemon +from dragonflow.common import utils as d_utils from dragonflow.tests.common import utils as test_utils from dragonflow.tests.fullstack import test_objects as objects @@ -845,7 +845,7 @@ class PortThread(object): """ self.packet_handler = packet_handler self.port = port - self.daemon = DFDaemon(is_not_light=True) + self.daemon = d_utils.DFDaemon(is_not_light=True) self.is_working = False self.thread_id = None diff --git a/dragonflow/tests/fullstack/test_pub_sub.py b/dragonflow/tests/fullstack/test_pub_sub.py index dbfdccdc3..eba7fdfb3 100644 --- a/dragonflow/tests/fullstack/test_pub_sub.py +++ b/dragonflow/tests/fullstack/test_pub_sub.py @@ -19,8 +19,8 @@ from oslo_config import cfg from oslo_serialization import jsonutils from dragonflow.common import utils as df_utils -from dragonflow.db.db_common import DbUpdate, SEND_ALL_TOPIC -from dragonflow.db.pub_sub_api import TableMonitor +from dragonflow.db import db_common +from dragonflow.db import pub_sub_api from dragonflow.tests.common import utils as test_utils from dragonflow.tests.fullstack import test_base from dragonflow.tests.fullstack import test_objects as objects @@ -59,7 +59,7 @@ def get_subscriber(callback): df_utils.DF_PUBSUB_DRIVER_NAMESPACE) subscriber = pub_sub_driver.get_subscriber() subscriber.initialize(callback) - subscriber.register_topic(SEND_ALL_TOPIC) + subscriber.register_topic(db_common.SEND_ALL_TOPIC) uri = '%s://%s:%s' % ( cfg.CONF.df.publisher_transport, '127.0.0.1', @@ -194,7 +194,8 @@ class TestPubSub(test_base.DFTestBase): eventlet.sleep(2) local_events_num = ns.events_num action = "log" - update = DbUpdate('info', 'log', action, "test ev no diff ports value") + update = db_common.DbUpdate( + 'info', 'log', action, "test ev no diff ports value") publisher.send_event(update) eventlet.sleep(1) @@ -229,7 +230,7 @@ class TestPubSub(test_base.DFTestBase): eventlet.sleep(0.5) local_events_num = self.events_num_t action = "log" - update = DbUpdate( + update = db_common.DbUpdate( 'info', 'log', action, @@ -242,7 +243,8 @@ class TestPubSub(test_base.DFTestBase): no_topic_action = 'log' other_topic = "Other-topic" self.events_action_t = None - update = DbUpdate('info', None, no_topic_action, "No topic value") + update = db_common.DbUpdate( + 'info', None, no_topic_action, "No topic value") publisher.send_event(update, other_topic) eventlet.sleep(1) @@ -269,14 +271,14 @@ class TestPubSub(test_base.DFTestBase): subscriber = get_subscriber(_db_change_callback) eventlet.sleep(2) action = "log" - update = DbUpdate( + update = db_common.DbUpdate( 'info', 'log', action, "value" ) update.action = action - update.topic = SEND_ALL_TOPIC + update.topic = db_common.SEND_ALL_TOPIC publisher.send_event(update) eventlet.sleep(1) self.assertEqual(ns.events_action, action) @@ -289,7 +291,7 @@ class TestPubSub(test_base.DFTestBase): subscriber.register_listen_address(uri) eventlet.sleep(2) update.action = action - update.topic = SEND_ALL_TOPIC + update.topic = db_common.SEND_ALL_TOPIC ns.events_action = None publisher2.send_event(update) eventlet.sleep(1) @@ -302,12 +304,12 @@ class TestMultiprocPubSub(test_base.DFTestBase): super(TestMultiprocPubSub, self).setUp() self.do_test = cfg.CONF.df.enable_df_pub_sub self.key = 'key-{}'.format(random.random()) - self.event = DbUpdate( + self.event = db_common.DbUpdate( 'info', None, "log", "TestMultiprocPubSub value", - topic=SEND_ALL_TOPIC, + topic=db_common.SEND_ALL_TOPIC, ) self.subscriber = None @@ -373,7 +375,7 @@ class TestDbTableMonitors(test_base.DFTestBase): }) def _create_monitor(self, table_name): - table_monitor = TableMonitor( + table_monitor = pub_sub_api.TableMonitor( table_name, self.nb_api.driver, self.publisher, diff --git a/dragonflow/tests/fullstack/test_topology.py b/dragonflow/tests/fullstack/test_topology.py index 1fa23ae6a..d8dcc1e25 100644 --- a/dragonflow/tests/fullstack/test_topology.py +++ b/dragonflow/tests/fullstack/test_topology.py @@ -11,7 +11,7 @@ # under the License. from dragonflow.controller.common import constants as const -from dragonflow.tests.common.utils import OvsFlowsParser, wait_until_none +from dragonflow.tests.common import utils from dragonflow.tests.fullstack import test_base from dragonflow.tests.fullstack import test_objects as objects @@ -63,13 +63,13 @@ class TestTopology(test_base.DFTestBase): def _remove_vm(self, vm): vm_mac = vm.get_first_mac() vm.close() - wait_until_none( + utils.wait_until_none( lambda: 1 if any(self._get_vm_flows(vm_mac)) else None, timeout=60, exception=Exception('VM flow was not deleted') ) def _get_vm_flows(self, vm_mac): - ovs_flows_parser = OvsFlowsParser() + ovs_flows_parser = utils.OvsFlowsParser() flows = ovs_flows_parser.dump(self.integration_bridge) flows = [flow for flow in flows if flow['table'] == str(const.ARP_TABLE) and diff --git a/dragonflow/tests/unit/test_redis_db.py b/dragonflow/tests/unit/test_redis_db.py index 10e174a30..fd5c5572f 100644 --- a/dragonflow/tests/unit/test_redis_db.py +++ b/dragonflow/tests/unit/test_redis_db.py @@ -15,7 +15,7 @@ import mock -from dragonflow.db.drivers.redis_db_driver import RedisDbDriver +from dragonflow.db.drivers import redis_db_driver from dragonflow.tests import base as tests_base @@ -23,7 +23,7 @@ class TestRedisDB(tests_base.BaseTestCase): def setUp(self): super(TestRedisDB, self).setUp() - self.RedisDbDriver = RedisDbDriver() + self.RedisDbDriver = redis_db_driver.RedisDbDriver() def test_set_success(self): client = mock.Mock() diff --git a/dragonflow/tests/unit/test_redis_pubsub.py b/dragonflow/tests/unit/test_redis_pubsub.py index 8d85a39bf..62510e27b 100644 --- a/dragonflow/tests/unit/test_redis_pubsub.py +++ b/dragonflow/tests/unit/test_redis_pubsub.py @@ -14,12 +14,9 @@ import mock -from dragonflow.db.db_common import DbUpdate +from dragonflow.db import db_common from dragonflow.db import pub_sub_api -from dragonflow.db.pubsub_drivers.redis_db_pubsub_driver \ - import RedisPublisherAgent -from dragonflow.db.pubsub_drivers.redis_db_pubsub_driver \ - import RedisSubscriberAgent +from dragonflow.db.pubsub_drivers import redis_db_pubsub_driver from dragonflow.tests import base as tests_base @@ -27,29 +24,30 @@ class TestRedisPubSub(tests_base.BaseTestCase): def setUp(self): super(TestRedisPubSub, self).setUp() - self.RedisPublisherAgent = RedisPublisherAgent() - self.RedisSubscriberAgent = RedisSubscriberAgent() + self.RedisPublisherAgent = redis_db_pubsub_driver.RedisPublisherAgent() + self.RedisSubscriberAgent = \ + redis_db_pubsub_driver.RedisSubscriberAgent() def test_publish_success(self): client = mock.Mock() self.RedisPublisherAgent.client = client client.publish.return_value = 1 - update = DbUpdate("router", - "key", - "action", - "value", - topic='teststring') + update = db_common.DbUpdate("router", + "key", + "action", + "value", + topic='teststring') result = self.RedisPublisherAgent.send_event(update, 'teststring') self.assertIsNone(result) def test_subscribe_success(self): pubsub = mock.Mock() self.RedisSubscriberAgent.pub_sub = pubsub - update = DbUpdate("router", - "key", - "action", - "value", - topic='teststring') + update = db_common.DbUpdate("router", + "key", + "action", + "value", + topic='teststring') data = pub_sub_api.pack_message(update.to_dict()) self.RedisSubscriberAgent.pub_sub.listen.return_value = \ [{'type': 'message', 'data': data}] diff --git a/dragonflow/utils/bloomfilter.py b/dragonflow/utils/bloomfilter.py index c29a8346f..24b612462 100644 --- a/dragonflow/utils/bloomfilter.py +++ b/dragonflow/utils/bloomfilter.py @@ -12,7 +12,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from random import Random +import random class BloomFilter(object): @@ -43,8 +43,8 @@ class BloomFilter(object): return self._array def _get_probes(self, key): - random = Random(key).random - return (int(random() * self._num_bits) + rdm = random.Random(key).random + return (int(rdm() * self._num_bits) for _probe in range(self._num_probes)) def update(self, keys):