Clean imports in code

This patch set modifies lines which are importing objects
instead of modules. As per openstack import guide lines, user should
import modules in a file not objects.

http://docs.openstack.org/developer/hacking/#imports

Change-Id: I5aa415966a94bec00257c4d34bd5633de7bb34c8
This commit is contained in:
Cao Xuan Hoang 2016-08-31 16:00:43 +07:00
parent cbd13373ab
commit f60ab6709a
24 changed files with 148 additions and 139 deletions

View File

@ -21,7 +21,7 @@ from neutron.agent.common import config
from neutron.common import config as common_config
from oslo_config import cfg
from oslo_log import log
from ryu.base.app_manager import AppManager
from ryu.base import app_manager
from ryu import cfg as ryu_cfg
from ryu.ofproto import ofproto_common
@ -29,8 +29,8 @@ from dragonflow._i18n import _LI, _LW, _
from dragonflow.common import common_params
from dragonflow.common import constants
from dragonflow.common import utils as df_utils
from dragonflow.controller.ryu_base_app import RyuDFAdapter
from dragonflow.controller.topology import Topology
from dragonflow.controller import ryu_base_app
from dragonflow.controller import topology
from dragonflow.db import api_nb
from dragonflow.db import db_store
from dragonflow.db.drivers import ovsdb_vswitch_impl
@ -72,8 +72,9 @@ class DfLocalController(object):
vswitch_api=self.vswitch_api,
db_store=self.db_store
)
app_mgr = AppManager.get_instance()
self.open_flow_app = app_mgr.instantiate(RyuDFAdapter, **kwargs)
app_mgr = app_manager.AppManager.get_instance()
self.open_flow_app = app_mgr.instantiate(ryu_base_app.RyuDFAdapter,
**kwargs)
self.topology = None
self.enable_selective_topo_dist = \
@ -84,7 +85,8 @@ class DfLocalController(object):
self.nb_api.initialize(db_ip=cfg.CONF.df.remote_db_ip,
db_port=cfg.CONF.df.remote_db_port)
self.vswitch_api.initialize()
self.topology = Topology(self, self.enable_selective_topo_dist)
self.topology = topology.Topology(self,
self.enable_selective_topo_dist)
self.vswitch_api.sync()
# both set_controller and del_controller will delete flows.

View File

@ -11,7 +11,7 @@
# under the License.
import eventlet
from eventlet.queue import Queue
from eventlet import queue
import sys
import time
import traceback
@ -35,7 +35,7 @@ LOG = logging.getLogger(__name__)
class PublisherService(object):
def __init__(self):
self._queue = Queue()
self._queue = queue.Queue()
self.publisher = self._get_publisher()
self.multiproc_subscriber = self._get_multiproc_subscriber()
self.db = df_utils.load_driver(

View File

@ -31,7 +31,7 @@ from ryu.ofproto import ether
from dragonflow.common import utils as df_utils
from dragonflow._i18n import _, _LI, _LE, _LW
from dragonflow.controller.common import constants as const
from dragonflow.controller.df_base_app import DFlowApp
from dragonflow.controller import df_base_app
DF_DHCP_OPTS = [
cfg.ListOpt('df_dns_servers',
@ -59,7 +59,7 @@ DHCP_ACK = 5
DHCP_CLASSLESS_ROUTE = 121
class DHCPApp(DFlowApp):
class DHCPApp(df_base_app.DFlowApp):
def __init__(self, *args, **kwargs):
super(DHCPApp, self).__init__(*args, **kwargs)
self.idle_timeout = 30

View File

@ -25,10 +25,10 @@ from ryu.ofproto import ether
import six
from dragonflow._i18n import _
from dragonflow.controller.common.arp_responder import ArpResponder
from dragonflow.controller.common import arp_responder
from dragonflow.controller.common import constants as const
from dragonflow.controller.common import utils
from dragonflow.controller.df_base_app import DFlowApp
from dragonflow.controller import df_base_app
DF_DNAT_APP_OPTS = [
@ -48,7 +48,7 @@ DF_DNAT_APP_OPTS = [
FIP_GW_RESOLVING_STATUS = 'resolving'
class DNATApp(DFlowApp):
class DNATApp(df_base_app.DFlowApp):
def __init__(self, *args, **kwargs):
super(DNATApp, self).__init__(*args, **kwargs)
@ -154,7 +154,7 @@ class DNATApp(DFlowApp):
# install floatingip arp responder flow rules
if netaddr.IPAddress(floatingip.get_ip_address()).version != 4:
return
ArpResponder(self.get_datapath(),
arp_responder.ArpResponder(self.get_datapath(),
None,
floatingip.get_ip_address(),
floatingip.get_mac_address(),
@ -164,7 +164,7 @@ class DNATApp(DFlowApp):
# install floatingip arp responder flow rules
if netaddr.IPAddress(floatingip.get_ip_address()).version != 4:
return
ArpResponder(self.get_datapath(),
arp_responder.ArpResponder(self.get_datapath(),
None,
floatingip.get_ip_address(),
floatingip.get_mac_address(),

View File

@ -23,15 +23,15 @@ from ryu.lib.packet import packet
from ryu.ofproto import ether
from dragonflow._i18n import _LE
from dragonflow.controller.common.arp_responder import ArpResponder
from dragonflow.controller.common import arp_responder
from dragonflow.controller.common import constants as const
from dragonflow.controller.common.icmp_responder import ICMPResponder
from dragonflow.controller.df_base_app import DFlowApp
from dragonflow.controller.common import icmp_responder
from dragonflow.controller import df_base_app
LOG = log.getLogger(__name__)
class L3App(DFlowApp):
class L3App(df_base_app.DFlowApp):
def __init__(self, *args, **kwargs):
super(L3App, self).__init__(*args, **kwargs)
self.idle_timeout = 30
@ -150,8 +150,9 @@ class L3App(DFlowApp):
# Add router ARP & ICMP responder for IPv4 Addresses
is_ipv4 = netaddr.IPAddress(dst_ip).version == 4
if is_ipv4:
ArpResponder(datapath, local_network_id, dst_ip, mac).add()
ICMPResponder(datapath, dst_ip, mac).add()
arp_responder.ArpResponder(
datapath, local_network_id, dst_ip, mac).add()
icmp_responder.ICMPResponder(datapath, dst_ip, mac).add()
# If router interface IP, send to output table
if is_ipv4:
@ -273,8 +274,9 @@ class L3App(DFlowApp):
mac = router_port.get_mac()
if netaddr.IPAddress(ip).version == 4:
ArpResponder(self.get_datapath(), local_network_id, ip).remove()
ICMPResponder(self.get_datapath(), ip, mac).remove()
arp_responder.ArpResponder(
self.get_datapath(), local_network_id, ip).remove()
icmp_responder.ICMPResponder(self.get_datapath(), ip, mac).remove()
match = parser.OFPMatch()
match.set_metadata(local_network_id)

View File

@ -16,13 +16,13 @@ from neutron_lib import constants as common_const
from ryu.lib.mac import haddr_to_bin
from ryu.ofproto import ether
from dragonflow.controller.common.arp_responder import ArpResponder
from dragonflow.controller.common import arp_responder
from dragonflow.controller.common import constants as const
from dragonflow.controller.common.icmp_responder import ICMPResponder
from dragonflow.controller.df_base_app import DFlowApp
from dragonflow.controller.common import icmp_responder
from dragonflow.controller import df_base_app
class L3ProactiveApp(DFlowApp):
class L3ProactiveApp(df_base_app.DFlowApp):
def __init__(self, *args, **kwargs):
super(L3ProactiveApp, self).__init__(*args, **kwargs)
@ -44,8 +44,10 @@ class L3ProactiveApp(DFlowApp):
# Add router ARP & ICMP responder for IPv4 Addresses
is_ipv4 = netaddr.IPAddress(dst_ip).version == 4
if is_ipv4:
ArpResponder(datapath, local_network_id, dst_ip, mac).add()
ICMPResponder(datapath, dst_ip, mac).add()
arp_responder.ArpResponder(datapath,
local_network_id,
dst_ip, mac).add()
icmp_responder.ICMPResponder(datapath, dst_ip, mac).add()
# If router interface IP, send to output table
if is_ipv4:
@ -184,8 +186,9 @@ class L3ProactiveApp(DFlowApp):
mac = router_port.get_mac()
if netaddr.IPAddress(ip).version == 4:
ArpResponder(self.get_datapath(), local_network_id, ip).remove()
ICMPResponder(self.get_datapath(), ip, mac).remove()
arp_responder.ArpResponder(self.get_datapath(),
local_network_id, ip).remove()
icmp_responder.ICMPResponder(self.get_datapath(), ip, mac).remove()
match = parser.OFPMatch()
match.set_metadata(local_network_id)

View File

@ -29,12 +29,11 @@ from oslo_utils import encodeutils
from neutron.agent.ovsdb.native import idlutils
from dragonflow._i18n import _, _LW, _LE
from dragonflow.common.exceptions import LogicalPortNotFoundByTunnelKey
from dragonflow.common.exceptions import NoRemoteIPProxyException
from dragonflow.common import exceptions
from dragonflow.common import utils as df_utils
from dragonflow.controller.common.arp_responder import ArpResponder
from dragonflow.controller.common import arp_responder
from dragonflow.controller.common import constants as const
from dragonflow.controller.df_base_app import DFlowApp
from dragonflow.controller import df_base_app
from dragonflow.db import api_nb
from ryu.lib.packet import arp
@ -72,7 +71,7 @@ options = [
]
class MetadataServiceApp(DFlowApp):
class MetadataServiceApp(df_base_app.DFlowApp):
def __init__(self, api, db_store=None, vswitch_api=None, nb_api=None):
super(MetadataServiceApp, self).__init__(
api,
@ -394,7 +393,7 @@ class MetadataServiceApp(DFlowApp):
]
def _create_arp_responder(self, mac):
self._arp_responder = ArpResponder(
self._arp_responder = arp_responder.ArpResponder(
self.get_datapath(),
None,
const.METADATA_SERVICE_IP,
@ -500,7 +499,7 @@ class DFMetadataProxyHandler(BaseMetadataProxyHandler):
def get_headers(self, req):
remote_addr = req.remote_addr
if not remote_addr:
raise NoRemoteIPProxyException()
raise exceptions.NoRemoteIPProxyException()
tunnel_key = int(netaddr.IPAddress(remote_addr) & ~0x80000000)
lport = self._get_logical_port_by_tunnel_key(tunnel_key)
headers = dict(req.headers)
@ -540,7 +539,7 @@ class DFMetadataProxyHandler(BaseMetadataProxyHandler):
for lport in lports:
if lport.get_tunnel_key() == tunnel_key:
return lport
raise LogicalPortNotFoundByTunnelKey(key=tunnel_key)
raise exceptions.LogicalPortNotFoundByTunnelKey(key=tunnel_key)
# Taken from Neurton: neutron/agent/metadata/agent.py
def _sign_instance_id(self, instance_id):

View File

@ -16,8 +16,8 @@
import eventlet
from oslo_log import log
import ryu.app.ofctl.api as ofctl_api
from ryu.app.ofctl.service import OfctlService
from ryu.base.app_manager import AppManager
from ryu.app.ofctl import service
from ryu.base import app_manager
import ryu.exception as ryu_exc
from dragonflow._i18n import _LE
@ -32,8 +32,8 @@ class OpenFlowSwitchMixin(object):
"""
def __init__(self, ryu_app):
app_mgr = AppManager.get_instance()
self.ofctl_app = app_mgr.instantiate(OfctlService)
app_mgr = app_manager.AppManager.get_instance()
self.ofctl_app = app_mgr.instantiate(service.OfctlService)
self.ofctl_app.start()
self._app = ryu_app

View File

@ -20,7 +20,7 @@ from ryu.ofproto import ether
from dragonflow._i18n import _LI
from dragonflow.controller.common import constants as const
from dragonflow.controller.df_base_app import DFlowApp
from dragonflow.controller import df_base_app
config.setup_logging()
@ -32,7 +32,7 @@ DHCP_CLIENT_PORT = 68
DHCP_SERVER_PORT = 67
class PortSecApp(DFlowApp):
class PortSecApp(df_base_app.DFlowApp):
def _add_flow_drop(self, priority, match):
drop_inst = None

View File

@ -27,8 +27,7 @@ import six
from dragonflow._i18n import _LI, _LW, _LE
from dragonflow.common import utils as df_utils
from dragonflow.db.db_common import DbUpdate, SEND_ALL_TOPIC, \
DB_SYNC_MINIMUM_INTERVAL
from dragonflow.db import db_common
from dragonflow.db import pub_sub_api
LOG = log.getLogger(__name__)
@ -118,7 +117,7 @@ class NbApi(object):
def _start_subsciber(self):
self.subscriber.initialize(self.db_change_callback)
self.subscriber.register_topic(SEND_ALL_TOPIC)
self.subscriber.register_topic(db_common.SEND_ALL_TOPIC)
publishers_ips = cfg.CONF.df.publishers_ips
uris = {'%s://%s:%s' % (
cfg.CONF.df.publisher_transport,
@ -138,8 +137,8 @@ class NbApi(object):
def _send_db_change_event(self, table, key, action, value, topic):
if self.use_pubsub:
if not self.enable_selective_topo_dist:
topic = SEND_ALL_TOPIC
update = DbUpdate(table, key, action, value, topic=topic)
topic = db_common.SEND_ALL_TOPIC
update = db_common.DbUpdate(table, key, action, value, topic=topic)
self.publisher.send_event(update)
eventlet.sleep(0)
@ -155,14 +154,14 @@ class NbApi(object):
self._read_db_changes_from_queue()
def db_change_callback(self, table, key, action, value, topic=None):
update = DbUpdate(table, key, action, value, topic=topic)
update = db_common.DbUpdate(table, key, action, value, topic=topic)
LOG.info(_LI("Pushing Update to Queue: %s"), update)
self._queue.put(update)
eventlet.sleep(0)
def _read_db_changes_from_queue(self):
sync_rate_limiter = df_utils.RateLimiter(
max_rate=1, time_unit=DB_SYNC_MINIMUM_INTERVAL)
max_rate=1, time_unit=db_common.DB_SYNC_MINIMUM_INTERVAL)
while True:
self.next_update = self._queue.get(block=True)
LOG.debug("Event update: %s", self.next_update)

View File

@ -17,8 +17,8 @@ import etcd
import eventlet
from oslo_log import log
import urllib3
from urllib3.connection import HTTPException, BaseSSLError
from urllib3.exceptions import ReadTimeoutError, ProtocolError
from urllib3 import connection
from urllib3 import exceptions
from dragonflow.common import exceptions as df_exceptions
from dragonflow.db import db_api
@ -47,20 +47,22 @@ def _error_catcher(self):
# FIXME: Ideally we'd like to include the url in the
# ReadTimeoutError but there is yet no clean way to
# get at it from this context.
raise ReadTimeoutError(self._pool, None, 'Read timed out.')
raise exceptions.ReadTimeoutError(
self._pool, None, 'Read timed out.')
except BaseSSLError as e:
except connection.BaseSSLError as e:
# FIXME: Is there a better way to differentiate between SSLErrors?
if 'read operation timed out' not in str(e): # Defensive:
# This shouldn't happen but just in case we're missing an edge
# case, let's avoid swallowing SSL errors.
raise
raise ReadTimeoutError(self._pool, None, 'Read timed out.')
raise exceptions.ReadTimeoutError(
self._pool, None, 'Read timed out.')
except HTTPException as e:
except connection.HTTPException as e:
# This includes IncompleteRead.
raise ProtocolError('Connection broken: %r' % e, e)
raise exceptions.ProtocolError('Connection broken: %r' % e, e)
except Exception:
# The response may not be closed but we're not going to use it anymore
# so close it now to ensure that the connection is released back to the

View File

@ -16,14 +16,13 @@
from neutron.agent.ovsdb import impl_idl
from neutron.agent.ovsdb.native import commands
from neutron.agent.ovsdb.native.commands import BaseCommand
from neutron.agent.ovsdb.native import connection
from neutron.agent.ovsdb.native import helpers
from neutron.agent.ovsdb.native import idlutils
from oslo_config import cfg
from ovs.db import idl
from ovs import poller
from ovs.vlog import Vlog
from ovs import vlog
import retrying
import six
import threading
@ -128,7 +127,7 @@ class OvsdbSwitchApi(api_vswitch.SwitchApi):
self.nb_api = nb_api
self.ovsdb_monitor = None
self.integration_bridge = cfg.CONF.df.integration_bridge
Vlog.init('dragonflow')
vlog.Vlog.init('dragonflow')
def initialize(self):
db_connection = ('%s:%s:%s' % (self.protocol, self.ip, self.port))
@ -314,7 +313,7 @@ class OvsdbTunnelPort(OvsdbSwitchPort):
return self.chassis_id
class DelControllerCommand(BaseCommand):
class DelControllerCommand(commands.BaseCommand):
def __init__(self, api, bridge):
super(DelControllerCommand, self).__init__(api)
self.bridge = bridge
@ -324,7 +323,7 @@ class DelControllerCommand(BaseCommand):
br.controller = []
class SetControllerCommand(BaseCommand):
class SetControllerCommand(commands.BaseCommand):
def __init__(self, api, bridge, targets):
super(SetControllerCommand, self).__init__(api)
self.bridge = bridge
@ -341,7 +340,7 @@ class SetControllerCommand(BaseCommand):
br.controller = controllers
class SetControllerFailModeCommand(BaseCommand):
class SetControllerFailModeCommand(commands.BaseCommand):
def __init__(self, api, bridge, fail_mode):
super(SetControllerFailModeCommand, self).__init__(api)
self.bridge = bridge
@ -353,7 +352,7 @@ class SetControllerFailModeCommand(BaseCommand):
br.fail_mode = [self.fail_mode]
class DeleteSwitchPort(BaseCommand):
class DeleteSwitchPort(commands.BaseCommand):
def __init__(self, api, switch_port):
super(DeleteSwitchPort, self).__init__(api)
self.switch_port = switch_port
@ -376,7 +375,7 @@ class DeleteSwitchPort(BaseCommand):
self.api.idl.tables['Port'].rows[port.uuid].delete()
class AddTunnelPort(BaseCommand):
class AddTunnelPort(commands.BaseCommand):
def __init__(self, api, chassis):
super(AddTunnelPort, self).__init__(api)
self.chassis = chassis
@ -450,7 +449,7 @@ class OvsdbMonitor(object):
self._notify_update_local_interface(_interface, action)
class AddPatchPort(BaseCommand):
class AddPatchPort(commands.BaseCommand):
def __init__(self, api, bridge, port, remote_name):
super(AddPatchPort, self).__init__(api)
self.bridge = bridge

View File

@ -29,7 +29,6 @@
# installing it
import ctypes
from ctypes.util import find_library
import os
@ -67,7 +66,7 @@ def get_library_path():
path = test_path
break
if not path:
path = find_library('ramcloud')
path = ctypes.util.find_library('ramcloud')
return path
@ -115,8 +114,6 @@ def load_so():
ctypes.c_bool = c_bool_compat
from ctypes import POINTER
# argument types aliased to their names for sanity
# alphabetical order
address = ctypes.c_char_p
@ -133,14 +130,14 @@ def load_so():
keysOnly = ctypes.c_uint32
name = ctypes.c_char_p
nanoseconds = ctypes.c_uint64
rejectRules = POINTER(RejectRules)
rejectRules = ctypes.POINTER(RejectRules)
serviceLocator = ctypes.c_char_p
status = ctypes.c_int
table = ctypes.c_uint64
version = ctypes.c_uint64
serverId = ctypes.c_uint64
so.rc_connect.argtypes = [address, address, POINTER(client)]
so.rc_connect.argtypes = [address, address, ctypes.POINTER(client)]
so.rc_connect.restype = status
so.rc_disconnect.argtypes = [client]
@ -155,31 +152,34 @@ def load_so():
so.rc_getStatus.argtypes = []
so.rc_getStatus.restype = status
so.rc_getTableId.argtypes = [client, name, POINTER(table)]
so.rc_getTableId.argtypes = [client, name, ctypes.POINTER(table)]
so.rc_getTableId.restype = status
so.rc_enumerateTablePrepare.argtypes = [client, table, keysOnly,
POINTER(enumerationState)]
ctypes.POINTER(enumerationState)]
so.rc_enumerateTablePrepare.restype = None
so.rc_enumerateTableNext.argtypes = [client, enumerationState,
POINTER(keyLen), POINTER(enum_key),
POINTER(dataLength), POINTER(data)]
ctypes.POINTER(keyLen),
ctypes.POINTER(enum_key),
ctypes.POINTER(dataLength),
ctypes.POINTER(data)]
so.rc_enumerateTableNextrestype = status
so.rc_enumerateTableFinalize.argtypes = [enumerationState]
so.rc_enumerateTableFinalize.restype = None
so.rc_read.argtypes = [client, table, key, keyLength, rejectRules,
POINTER(version), buf, len, POINTER(len)]
ctypes.POINTER(version), buf, len,
ctypes.POINTER(len)]
so.rc_read.restype = status
so.rc_remove.argtypes = [client, table, key, keyLength, rejectRules,
POINTER(version)]
ctypes.POINTER(version)]
so.rc_remove.restype = status
so.rc_write.argtypes = [client, table, key, keyLength, buf, len,
rejectRules, POINTER(version)]
rejectRules, ctypes.POINTER(version)]
so.rc_write.restype = status
so.rc_testing_kill.argtypes = [client, table, key, keyLength]
@ -190,7 +190,7 @@ def load_so():
so.rc_testing_fill.restype = status
so.rc_testing_get_server_id.argtypes = [client, table, key, keyLength,
POINTER(serverId)]
ctypes.POINTER(serverId)]
so.rc_testing_get_server_id.restype = status
so.rc_testing_get_service_locator.argtypes = [client, table, key,

View File

@ -22,7 +22,7 @@ import six
from dragonflow._i18n import _LE, _LW
from dragonflow.db import db_api
from dragonflow.db.drivers.redis_mgt import RedisMgt
from dragonflow.db.drivers import redis_mgt
LOG = log.getLogger(__name__)
@ -39,7 +39,7 @@ class RedisDbDriver(db_api.DbApi):
def initialize(self, db_ip, db_port, **args):
# get remote ip port list
self.redis_mgt = RedisMgt.get_instance(db_ip, db_port)
self.redis_mgt = redis_mgt.RedisMgt.get_instance(db_ip, db_port)
self._update_server_list()
def _update_server_list(self):

View File

@ -21,7 +21,7 @@ import six
from dragonflow._i18n import _LI, _LE, _LW
from dragonflow.common import utils as df_utils
from dragonflow.db.db_common import DbUpdate
from dragonflow.db import db_common
from dragonflow.db.drivers import redis_calckey
LOG = log.getLogger(__name__)
@ -356,8 +356,9 @@ class RedisMgt(object):
if len(nodes) > 0:
if self.publisher is not None:
nodes_json = jsonutils.dumps(nodes)
update = DbUpdate('ha', 'nodes', 'set', nodes_json,
topic='redis')
update = db_common.DbUpdate('ha', 'nodes',
'set', nodes_json,
topic='redis')
self.publisher.send_event(update)
# process new nodes got

View File

@ -11,9 +11,9 @@
# under the License.
import kazoo
from kazoo.client import KazooClient
from kazoo.handlers.eventlet import SequentialEventletHandler
from kazoo.retry import KazooRetry
from kazoo import client
from kazoo.handlers import eventlet
from kazoo import retry
import six
from dragonflow.common import exceptions as df_exceptions
@ -57,12 +57,12 @@ class ZookeeperDbDriver(db_api.DbApi):
def _lazy_initialize(self):
if not self.client:
hosts = _parse_hosts(self.config.remote_db_hosts)
_handler = SequentialEventletHandler()
_retry = KazooRetry(max_tries=CLIENT_CONNECTION_RETRIES,
_handler = eventlet.SequentialEventletHandler()
_retry = retry.KazooRetry(max_tries=CLIENT_CONNECTION_RETRIES,
delay=0.5,
backoff=2,
sleep_func=_handler.sleep_func)
self.client = KazooClient(hosts=hosts,
self.client = client.KazooClient(hosts=hosts,
handler=_handler,
connection_retry=_retry)
self.client.start()

View File

@ -20,7 +20,7 @@ import redis
from dragonflow._i18n import _LE, _LW
from dragonflow.common import common_params
from dragonflow.db.drivers.redis_mgt import RedisMgt
from dragonflow.db.drivers import redis_mgt
from dragonflow.db import pub_sub_api
LOG = logging.getLogger(__name__)
@ -54,8 +54,9 @@ class RedisPublisherAgent(pub_sub_api.PublisherApi):
def initialize(self):
# find a publisher server node
super(RedisPublisherAgent, self).initialize()
self.redis_mgt = RedisMgt.get_instance(cfg.CONF.df.remote_db_ip,
cfg.CONF.df.remote_db_port)
self.redis_mgt = redis_mgt.RedisMgt.get_instance(
cfg.CONF.df.remote_db_ip,
cfg.CONF.df.remote_db_port)
self._update_client()
def _update_client(self):
@ -122,8 +123,9 @@ class RedisSubscriberAgent(pub_sub_api.SubscriberAgentBase):
def initialize(self, callback):
# find a subscriber server node and run daemon
super(RedisSubscriberAgent, self).initialize(callback)
self.redis_mgt = RedisMgt.get_instance(cfg.CONF.df.remote_db_ip,
cfg.CONF.df.remote_db_port)
self.redis_mgt = redis_mgt.RedisMgt.get_instance(
cfg.CONF.df.remote_db_ip,
cfg.CONF.df.remote_db_port)
self._update_client()
def process_ha(self):

View File

@ -18,7 +18,7 @@ from oslo_config import cfg
from oslo_log import log as logging
from dragonflow._i18n import _LI, _LE
from dragonflow.common.exceptions import UnsupportedTransportException
from dragonflow.common import exceptions
from dragonflow.db import db_common
from dragonflow.db import pub_sub_api
@ -38,7 +38,7 @@ class ZMQPubSub(pub_sub_api.PubSubApi):
'transport': transport,
'expected': str(SUPPORTED_TRANSPORTS)
})
raise UnsupportedTransportException(transport=transport)
raise exceptions.UnsupportedTransportException(transport=transport)
self.subscriber = ZMQSubscriberAgent()
self.publisher = ZMQPublisherAgent()

View File

@ -27,7 +27,7 @@ import ryu.lib.packet
from dragonflow._i18n import _LI, _LE
from dragonflow.common import common_params
from dragonflow.common.utils import DFDaemon
from dragonflow.common import utils as d_utils
from dragonflow.tests.common import utils as test_utils
from dragonflow.tests.fullstack import test_objects as objects
@ -845,7 +845,7 @@ class PortThread(object):
"""
self.packet_handler = packet_handler
self.port = port
self.daemon = DFDaemon(is_not_light=True)
self.daemon = d_utils.DFDaemon(is_not_light=True)
self.is_working = False
self.thread_id = None

View File

@ -19,8 +19,8 @@ from oslo_config import cfg
from oslo_serialization import jsonutils
from dragonflow.common import utils as df_utils
from dragonflow.db.db_common import DbUpdate, SEND_ALL_TOPIC
from dragonflow.db.pub_sub_api import TableMonitor
from dragonflow.db import db_common
from dragonflow.db import pub_sub_api
from dragonflow.tests.common import utils as test_utils
from dragonflow.tests.fullstack import test_base
from dragonflow.tests.fullstack import test_objects as objects
@ -59,7 +59,7 @@ def get_subscriber(callback):
df_utils.DF_PUBSUB_DRIVER_NAMESPACE)
subscriber = pub_sub_driver.get_subscriber()
subscriber.initialize(callback)
subscriber.register_topic(SEND_ALL_TOPIC)
subscriber.register_topic(db_common.SEND_ALL_TOPIC)
uri = '%s://%s:%s' % (
cfg.CONF.df.publisher_transport,
'127.0.0.1',
@ -194,7 +194,8 @@ class TestPubSub(test_base.DFTestBase):
eventlet.sleep(2)
local_events_num = ns.events_num
action = "log"
update = DbUpdate('info', 'log', action, "test ev no diff ports value")
update = db_common.DbUpdate(
'info', 'log', action, "test ev no diff ports value")
publisher.send_event(update)
eventlet.sleep(1)
@ -229,7 +230,7 @@ class TestPubSub(test_base.DFTestBase):
eventlet.sleep(0.5)
local_events_num = self.events_num_t
action = "log"
update = DbUpdate(
update = db_common.DbUpdate(
'info',
'log',
action,
@ -242,7 +243,8 @@ class TestPubSub(test_base.DFTestBase):
no_topic_action = 'log'
other_topic = "Other-topic"
self.events_action_t = None
update = DbUpdate('info', None, no_topic_action, "No topic value")
update = db_common.DbUpdate(
'info', None, no_topic_action, "No topic value")
publisher.send_event(update, other_topic)
eventlet.sleep(1)
@ -269,14 +271,14 @@ class TestPubSub(test_base.DFTestBase):
subscriber = get_subscriber(_db_change_callback)
eventlet.sleep(2)
action = "log"
update = DbUpdate(
update = db_common.DbUpdate(
'info',
'log',
action,
"value"
)
update.action = action
update.topic = SEND_ALL_TOPIC
update.topic = db_common.SEND_ALL_TOPIC
publisher.send_event(update)
eventlet.sleep(1)
self.assertEqual(ns.events_action, action)
@ -289,7 +291,7 @@ class TestPubSub(test_base.DFTestBase):
subscriber.register_listen_address(uri)
eventlet.sleep(2)
update.action = action
update.topic = SEND_ALL_TOPIC
update.topic = db_common.SEND_ALL_TOPIC
ns.events_action = None
publisher2.send_event(update)
eventlet.sleep(1)
@ -302,12 +304,12 @@ class TestMultiprocPubSub(test_base.DFTestBase):
super(TestMultiprocPubSub, self).setUp()
self.do_test = cfg.CONF.df.enable_df_pub_sub
self.key = 'key-{}'.format(random.random())
self.event = DbUpdate(
self.event = db_common.DbUpdate(
'info',
None,
"log",
"TestMultiprocPubSub value",
topic=SEND_ALL_TOPIC,
topic=db_common.SEND_ALL_TOPIC,
)
self.subscriber = None
@ -373,7 +375,7 @@ class TestDbTableMonitors(test_base.DFTestBase):
})
def _create_monitor(self, table_name):
table_monitor = TableMonitor(
table_monitor = pub_sub_api.TableMonitor(
table_name,
self.nb_api.driver,
self.publisher,

View File

@ -11,7 +11,7 @@
# under the License.
from dragonflow.controller.common import constants as const
from dragonflow.tests.common.utils import OvsFlowsParser, wait_until_none
from dragonflow.tests.common import utils
from dragonflow.tests.fullstack import test_base
from dragonflow.tests.fullstack import test_objects as objects
@ -63,13 +63,13 @@ class TestTopology(test_base.DFTestBase):
def _remove_vm(self, vm):
vm_mac = vm.get_first_mac()
vm.close()
wait_until_none(
utils.wait_until_none(
lambda: 1 if any(self._get_vm_flows(vm_mac)) else None, timeout=60,
exception=Exception('VM flow was not deleted')
)
def _get_vm_flows(self, vm_mac):
ovs_flows_parser = OvsFlowsParser()
ovs_flows_parser = utils.OvsFlowsParser()
flows = ovs_flows_parser.dump(self.integration_bridge)
flows = [flow for flow in flows if
flow['table'] == str(const.ARP_TABLE) and

View File

@ -15,7 +15,7 @@
import mock
from dragonflow.db.drivers.redis_db_driver import RedisDbDriver
from dragonflow.db.drivers import redis_db_driver
from dragonflow.tests import base as tests_base
@ -23,7 +23,7 @@ class TestRedisDB(tests_base.BaseTestCase):
def setUp(self):
super(TestRedisDB, self).setUp()
self.RedisDbDriver = RedisDbDriver()
self.RedisDbDriver = redis_db_driver.RedisDbDriver()
def test_set_success(self):
client = mock.Mock()

View File

@ -14,12 +14,9 @@
import mock
from dragonflow.db.db_common import DbUpdate
from dragonflow.db import db_common
from dragonflow.db import pub_sub_api
from dragonflow.db.pubsub_drivers.redis_db_pubsub_driver \
import RedisPublisherAgent
from dragonflow.db.pubsub_drivers.redis_db_pubsub_driver \
import RedisSubscriberAgent
from dragonflow.db.pubsub_drivers import redis_db_pubsub_driver
from dragonflow.tests import base as tests_base
@ -27,29 +24,30 @@ class TestRedisPubSub(tests_base.BaseTestCase):
def setUp(self):
super(TestRedisPubSub, self).setUp()
self.RedisPublisherAgent = RedisPublisherAgent()
self.RedisSubscriberAgent = RedisSubscriberAgent()
self.RedisPublisherAgent = redis_db_pubsub_driver.RedisPublisherAgent()
self.RedisSubscriberAgent = \
redis_db_pubsub_driver.RedisSubscriberAgent()
def test_publish_success(self):
client = mock.Mock()
self.RedisPublisherAgent.client = client
client.publish.return_value = 1
update = DbUpdate("router",
"key",
"action",
"value",
topic='teststring')
update = db_common.DbUpdate("router",
"key",
"action",
"value",
topic='teststring')
result = self.RedisPublisherAgent.send_event(update, 'teststring')
self.assertIsNone(result)
def test_subscribe_success(self):
pubsub = mock.Mock()
self.RedisSubscriberAgent.pub_sub = pubsub
update = DbUpdate("router",
"key",
"action",
"value",
topic='teststring')
update = db_common.DbUpdate("router",
"key",
"action",
"value",
topic='teststring')
data = pub_sub_api.pack_message(update.to_dict())
self.RedisSubscriberAgent.pub_sub.listen.return_value = \
[{'type': 'message', 'data': data}]

View File

@ -12,7 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from random import Random
import random
class BloomFilter(object):
@ -43,8 +43,8 @@ class BloomFilter(object):
return self._array
def _get_probes(self, key):
random = Random(key).random
return (int(random() * self._num_bits)
rdm = random.Random(key).random
return (int(rdm() * self._num_bits)
for _probe in range(self._num_probes))
def update(self, keys):