From 4d58c761a0d7260588bedd9dd5e69d01cc971a67 Mon Sep 17 00:00:00 2001 From: dharmendra Date: Thu, 7 Apr 2016 21:19:58 +0530 Subject: [PATCH] Remove neutron agents codes Partial-Bug: 1515864 Co-Authored-By: gong yong sheng Co-Authored-By: dharmendra Change-Id: Id5ffba5767950a63097f99e0284a6215dc7e5476 --- tacker/agent/common/__init__.py | 0 tacker/agent/common/config.py | 121 ---- tacker/agent/linux/async_process.py | 221 ------- tacker/agent/linux/daemon.py | 147 ----- tacker/agent/linux/external_process.py | 100 --- tacker/agent/linux/interface.py | 454 -------------- tacker/agent/linux/ip_lib.py | 570 ------------------ tacker/agent/linux/polling.py | 115 ---- tacker/agent/rpc.py | 109 ---- tacker/tests/fake_notifier.py | 50 -- tacker/tests/functional/agent/__init__.py | 0 .../tests/functional/agent/linux/__init__.py | 0 tacker/tests/unit/agent/__init__.py | 0 tacker/tests/unit/agent/linux/__init__.py | 0 .../unit/agent/linux/test_async_process.py | 249 -------- 15 files changed, 2136 deletions(-) delete mode 100644 tacker/agent/common/__init__.py delete mode 100644 tacker/agent/common/config.py delete mode 100644 tacker/agent/linux/async_process.py delete mode 100644 tacker/agent/linux/daemon.py delete mode 100644 tacker/agent/linux/external_process.py delete mode 100644 tacker/agent/linux/interface.py delete mode 100644 tacker/agent/linux/ip_lib.py delete mode 100644 tacker/agent/linux/polling.py delete mode 100644 tacker/agent/rpc.py delete mode 100644 tacker/tests/fake_notifier.py delete mode 100644 tacker/tests/functional/agent/__init__.py delete mode 100644 tacker/tests/functional/agent/linux/__init__.py delete mode 100644 tacker/tests/unit/agent/__init__.py delete mode 100644 tacker/tests/unit/agent/linux/__init__.py delete mode 100644 tacker/tests/unit/agent/linux/test_async_process.py diff --git a/tacker/agent/common/__init__.py b/tacker/agent/common/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tacker/agent/common/config.py b/tacker/agent/common/config.py deleted file mode 100644 index a8943f182..000000000 --- a/tacker/agent/common/config.py +++ /dev/null @@ -1,121 +0,0 @@ -# Copyright 2012 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os - -from oslo_config import cfg - -from tacker.common import config -from tacker.openstack.common import log as logging - - -LOG = logging.getLogger(__name__) - - -ROOT_HELPER_OPTS = [ - cfg.StrOpt('root_helper', default='sudo', - help=_('Root helper application.')), -] - -AGENT_STATE_OPTS = [ - cfg.FloatOpt('report_interval', default=30, - help=_('Seconds between nodes reporting state to server; ' - 'should be less than agent_down_time, best if it ' - 'is half or less than agent_down_time.')), -] - -INTERFACE_DRIVER_OPTS = [ - cfg.StrOpt('interface_driver', - help=_("The driver used to manage the virtual interface.")), -] - -USE_NAMESPACES_OPTS = [ - cfg.BoolOpt('use_namespaces', default=True, - help=_("Allow overlapping IP.")), -] - - -def get_log_args(conf, log_file_name): - cmd_args = [] - if conf.debug: - cmd_args.append('--debug') - if conf.verbose: - cmd_args.append('--verbose') - if (conf.log_dir or conf.log_file): - cmd_args.append('--log-file=%s' % log_file_name) - log_dir = None - if conf.log_dir and conf.log_file: - log_dir = os.path.dirname( - os.path.join(conf.log_dir, conf.log_file)) - elif conf.log_dir: - log_dir = conf.log_dir - elif conf.log_file: - log_dir = os.path.dirname(conf.log_file) - if log_dir: - cmd_args.append('--log-dir=%s' % log_dir) - else: - if conf.use_syslog: - cmd_args.append('--use-syslog') - if conf.syslog_log_facility: - cmd_args.append( - '--syslog-log-facility=%s' % conf.syslog_log_facility) - return cmd_args - - -def register_root_helper(conf): - # The first call is to ensure backward compatibility - conf.register_opts(ROOT_HELPER_OPTS) - conf.register_opts(ROOT_HELPER_OPTS, 'AGENT') - - -def register_agent_state_opts_helper(conf): - conf.register_opts(AGENT_STATE_OPTS, 'AGENT') - - -def register_interface_driver_opts_helper(conf): - conf.register_opts(INTERFACE_DRIVER_OPTS) - - -def register_use_namespaces_opts_helper(conf): - conf.register_opts(USE_NAMESPACES_OPTS) - - -def get_root_helper(conf): - root_helper = conf.AGENT.root_helper - if root_helper != 'sudo': - return root_helper - - root_helper = conf.root_helper - if root_helper != 'sudo': - LOG.deprecated(_('DEFAULT.root_helper is deprecated! Please move ' - 'root_helper configuration to [AGENT] section.')) - return root_helper - - return 'sudo' - - -def setup_conf(): - bind_opts = [ - cfg.StrOpt('state_path', - default='/var/lib/tacker', - help=_('Top-level directory for maintaining dhcp state')), - ] - - conf = cfg.ConfigOpts() - conf.register_opts(bind_opts) - return conf - -# add a logging setup method here for convenience -setup_logging = config.setup_logging diff --git a/tacker/agent/linux/async_process.py b/tacker/agent/linux/async_process.py deleted file mode 100644 index 6f75ded3f..000000000 --- a/tacker/agent/linux/async_process.py +++ /dev/null @@ -1,221 +0,0 @@ -# Copyright 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import eventlet -import eventlet.event -import eventlet.queue - -from tacker.agent.linux import utils -from tacker.openstack.common import log as logging - - -LOG = logging.getLogger(__name__) - - -class AsyncProcessException(Exception): - pass - - -class AsyncProcess(object): - """Manages an asynchronous process. - - This class spawns a new process via subprocess and uses - greenthreads to read stderr and stdout asynchronously into queues - that can be read via repeatedly calling iter_stdout() and - iter_stderr(). - - If respawn_interval is non-zero, any error in communicating with - the managed process will result in the process and greenthreads - being cleaned up and the process restarted after the specified - interval. - - Example usage: - - >>> import time - >>> proc = AsyncProcess(['ping']) - >>> proc.start() - >>> time.sleep(5) - >>> proc.stop() - >>> for line in proc.iter_stdout(): - ... print line - """ - - def __init__(self, cmd, root_helper=None, respawn_interval=None): - """Constructor. - - :param cmd: The list of command arguments to invoke. - :param root_helper: Optional, utility to use when running shell cmds. - :param respawn_interval: Optional, the interval in seconds to wait - to respawn after unexpected process death. Respawn will - only be attempted if a value of 0 or greater is provided. - """ - self.cmd = cmd - self.root_helper = root_helper - if respawn_interval is not None and respawn_interval < 0: - raise ValueError(_('respawn_interval must be >= 0 if provided.')) - self.respawn_interval = respawn_interval - self._process = None - self._kill_event = None - self._reset_queues() - self._watchers = [] - - def _reset_queues(self): - self._stdout_lines = eventlet.queue.LightQueue() - self._stderr_lines = eventlet.queue.LightQueue() - - def start(self): - """Launch a process and monitor it asynchronously.""" - if self._kill_event: - raise AsyncProcessException(_('Process is already started')) - else: - LOG.debug(_('Launching async process [%s].'), self.cmd) - self._spawn() - - def stop(self): - """Halt the process and watcher threads.""" - if self._kill_event: - LOG.debug(_('Halting async process [%s].'), self.cmd) - self._kill() - else: - raise AsyncProcessException(_('Process is not running.')) - - def _spawn(self): - """Spawn a process and its watchers.""" - self._kill_event = eventlet.event.Event() - self._process, cmd = utils.create_process(self.cmd, - root_helper=self.root_helper) - self._watchers = [] - for reader in (self._read_stdout, self._read_stderr): - # Pass the stop event directly to the greenthread to - # ensure that assignment of a new event to the instance - # attribute does not prevent the greenthread from using - # the original event. - watcher = eventlet.spawn(self._watch_process, - reader, - self._kill_event) - self._watchers.append(watcher) - - def _kill(self, respawning=False): - """Kill the process and the associated watcher greenthreads. - - :param respawning: Optional, whether respawn will be subsequently - attempted. - """ - # Halt the greenthreads - self._kill_event.send() - - pid = self._get_pid_to_kill() - if pid: - self._kill_process(pid) - - if not respawning: - # Clear the kill event to ensure the process can be - # explicitly started again. - self._kill_event = None - - def _get_pid_to_kill(self): - pid = self._process.pid - # If root helper was used, two or more processes will be created: - # - # - a root helper process (e.g. sudo myscript) - # - possibly a rootwrap script (e.g. tacker-rootwrap) - # - a child process (e.g. myscript) - # - # Killing the root helper process will leave the child process - # running, re-parented to init, so the only way to ensure that both - # die is to target the child process directly. - if self.root_helper: - try: - pid = utils.find_child_pids(pid)[0] - except IndexError: - # Process is already dead - return None - while True: - try: - # We shouldn't have more than one child per process - # so keep getting the children of the first one - pid = utils.find_child_pids(pid)[0] - except IndexError: - # Last process in the tree, return it - break - return pid - - def _kill_process(self, pid): - try: - # A process started by a root helper will be running as - # root and need to be killed via the same helper. - utils.execute(['kill', '-9', pid], root_helper=self.root_helper) - except Exception as ex: - stale_pid = (isinstance(ex, RuntimeError) and - 'No such process' in str(ex)) - if not stale_pid: - LOG.exception(_('An error occurred while killing [%s].'), - self.cmd) - return False - return True - - def _handle_process_error(self): - """Kill the async process and respawn if necessary.""" - LOG.debug(_('Halting async process [%s] in response to an error.'), - self.cmd) - respawning = self.respawn_interval >= 0 - self._kill(respawning=respawning) - if respawning: - eventlet.sleep(self.respawn_interval) - LOG.debug(_('Respawning async process [%s].'), self.cmd) - self._spawn() - - def _watch_process(self, callback, kill_event): - while not kill_event.ready(): - try: - if not callback(): - break - except Exception: - LOG.exception(_('An error occurred while communicating ' - 'with async process [%s].'), self.cmd) - break - # Ensure that watching a process with lots of output does - # not block execution of other greenthreads. - eventlet.sleep() - # The kill event not being ready indicates that the loop was - # broken out of due to an error in the watched process rather - # than the loop condition being satisfied. - if not kill_event.ready(): - self._handle_process_error() - - def _read(self, stream, queue): - data = stream.readline() - if data: - data = data.strip() - queue.put(data) - return data - - def _read_stdout(self): - return self._read(self._process.stdout, self._stdout_lines) - - def _read_stderr(self): - return self._read(self._process.stderr, self._stderr_lines) - - def _iter_queue(self, queue): - while True: - try: - yield queue.get_nowait() - except eventlet.queue.Empty: - break - - def iter_stdout(self): - return self._iter_queue(self._stdout_lines) - - def iter_stderr(self): - return self._iter_queue(self._stderr_lines) diff --git a/tacker/agent/linux/daemon.py b/tacker/agent/linux/daemon.py deleted file mode 100644 index e4feb3c29..000000000 --- a/tacker/agent/linux/daemon.py +++ /dev/null @@ -1,147 +0,0 @@ -# Copyright 2012 New Dream Network, LLC (DreamHost) -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import atexit -import fcntl -import os -import signal -import sys - -from tacker.openstack.common import log as logging - -LOG = logging.getLogger(__name__) - - -class Pidfile(object): - def __init__(self, pidfile, procname, uuid=None): - self.pidfile = pidfile - self.procname = procname - self.uuid = uuid - try: - self.fd = os.open(pidfile, os.O_CREAT | os.O_RDWR) - fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB) - except IOError: - LOG.exception(_("Error while handling pidfile: %s"), pidfile) - sys.exit(1) - - def __str__(self): - return self.pidfile - - def unlock(self): - if not not fcntl.flock(self.fd, fcntl.LOCK_UN): - raise IOError(_('Unable to unlock pid file')) - - def write(self, pid): - os.ftruncate(self.fd, 0) - os.write(self.fd, "%d" % pid) - os.fsync(self.fd) - - def read(self): - try: - pid = int(os.read(self.fd, 128)) - os.lseek(self.fd, 0, os.SEEK_SET) - return pid - except ValueError: - return - - def is_running(self): - pid = self.read() - if not pid: - return False - - cmdline = '/proc/%s/cmdline' % pid - try: - with open(cmdline, "r") as f: - exec_out = f.readline() - return self.procname in exec_out and (not self.uuid or - self.uuid in exec_out) - except IOError: - return False - - -class Daemon(object): - """A generic daemon class. - - Usage: subclass the Daemon class and override the run() method - """ - def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', - stderr='/dev/null', procname='python', uuid=None): - self.stdin = stdin - self.stdout = stdout - self.stderr = stderr - self.procname = procname - self.pidfile = Pidfile(pidfile, procname, uuid) - - def _fork(self): - try: - pid = os.fork() - if pid > 0: - sys.exit(0) - except OSError: - LOG.exception(_('Fork failed')) - sys.exit(1) - - def daemonize(self): - """Daemonize process by doing Stevens double fork.""" - # fork first time - self._fork() - - # decouple from parent environment - os.chdir("/") - os.setsid() - os.umask(0) - - # fork second time - self._fork() - - # redirect standard file descriptors - sys.stdout.flush() - sys.stderr.flush() - stdin = open(self.stdin, 'r') - stdout = open(self.stdout, 'a+') - stderr = open(self.stderr, 'a+', 0) - os.dup2(stdin.fileno(), sys.stdin.fileno()) - os.dup2(stdout.fileno(), sys.stdout.fileno()) - os.dup2(stderr.fileno(), sys.stderr.fileno()) - - # write pidfile - atexit.register(self.delete_pid) - signal.signal(signal.SIGTERM, self.handle_sigterm) - self.pidfile.write(os.getpid()) - - def delete_pid(self): - os.remove(str(self.pidfile)) - - def handle_sigterm(self, signum, frame): - sys.exit(0) - - def start(self): - """Start the daemon.""" - - if self.pidfile.is_running(): - self.pidfile.unlock() - message = _('Pidfile %s already exist. Daemon already running?') - LOG.error(message, self.pidfile) - sys.exit(1) - - # Start the daemon - self.daemonize() - self.run() - - def run(self): - """Override this method when subclassing Daemon. - - start() will call this method after the process has daemonized. - """ - pass diff --git a/tacker/agent/linux/external_process.py b/tacker/agent/linux/external_process.py deleted file mode 100644 index 85765ee22..000000000 --- a/tacker/agent/linux/external_process.py +++ /dev/null @@ -1,100 +0,0 @@ -# Copyright 2012 New Dream Network, LLC (DreamHost) -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os - -from oslo_config import cfg - -from tacker.agent.linux import ip_lib -from tacker.agent.linux import utils -from tacker.openstack.common import log as logging - -LOG = logging.getLogger(__name__) - -OPTS = [ - cfg.StrOpt('external_pids', - default='$state_path/external/pids', - help=_('Location to store child pid files')), -] - -cfg.CONF.register_opts(OPTS) - - -class ProcessManager(object): - """An external process manager for Tacker spawned processes. - - Note: The manager expects uuid to be in cmdline. - """ - def __init__(self, conf, uuid, root_helper='sudo', namespace=None): - self.conf = conf - self.uuid = uuid - self.root_helper = root_helper - self.namespace = namespace - - def enable(self, cmd_callback): - if not self.active: - cmd = cmd_callback(self.get_pid_file_name(ensure_pids_dir=True)) - - ip_wrapper = ip_lib.IPWrapper(self.root_helper, self.namespace) - ip_wrapper.netns.execute(cmd) - - def disable(self): - pid = self.pid - - if self.active: - cmd = ['kill', '-9', pid] - utils.execute(cmd, self.root_helper) - elif pid: - LOG.debug(_('Process for %(uuid)s pid %(pid)d is stale, ignoring ' - 'command'), {'uuid': self.uuid, 'pid': pid}) - else: - LOG.debug(_('No process started for %s'), self.uuid) - - def get_pid_file_name(self, ensure_pids_dir=False): - """Returns the file name for a given kind of config file.""" - pids_dir = os.path.abspath(os.path.normpath(self.conf.external_pids)) - if ensure_pids_dir and not os.path.isdir(pids_dir): - os.makedirs(pids_dir, 0o755) - - return os.path.join(pids_dir, self.uuid + '.pid') - - @property - def pid(self): - """Last known pid for this external process spawned for this uuid.""" - file_name = self.get_pid_file_name() - msg = _('Error while reading %s') - - try: - with open(file_name, 'r') as f: - return int(f.read()) - except IOError: - msg = _('Unable to access %s') - except ValueError: - msg = _('Unable to convert value in %s') - - LOG.debug(msg, file_name) - return None - - @property - def active(self): - pid = self.pid - if pid is None: - return False - - cmdline = '/proc/%s/cmdline' % pid - try: - with open(cmdline, "r") as f: - return self.uuid in f.readline() - except IOError: - return False diff --git a/tacker/agent/linux/interface.py b/tacker/agent/linux/interface.py deleted file mode 100644 index d934f9510..000000000 --- a/tacker/agent/linux/interface.py +++ /dev/null @@ -1,454 +0,0 @@ -# Copyright 2012 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc - -import netaddr -from oslo_config import cfg -import six - -from tacker.agent.common import config -from tacker.agent.linux import ip_lib -from tacker.agent.linux import ovs_lib -from tacker.agent.linux import utils -from tacker.common import exceptions -from tacker.extensions import flavor -from tacker.openstack.common import importutils -from tacker.openstack.common import log as logging - - -LOG = logging.getLogger(__name__) - -OPTS = [ - cfg.StrOpt('ovs_integration_bridge', - default='br-int', - help=_('Name of Open vSwitch bridge to use')), - cfg.BoolOpt('ovs_use_veth', - default=False, - help=_('Uses veth for an interface or not')), - cfg.IntOpt('network_device_mtu', - help=_('MTU setting for device.')), - cfg.StrOpt('meta_flavor_driver_mappings', - help=_('Mapping between flavor and LinuxInterfaceDriver')), - cfg.StrOpt('admin_user', - help=_("Admin username")), - cfg.StrOpt('admin_password', - help=_("Admin password"), - secret=True), - cfg.StrOpt('admin_tenant_name', - help=_("Admin tenant name")), - cfg.StrOpt('auth_url', - help=_("Authentication URL")), - cfg.StrOpt('auth_strategy', default='keystone', - help=_("The type of authentication to use")), - cfg.StrOpt('auth_region', - help=_("Authentication region")), -] - - -@six.add_metaclass(abc.ABCMeta) -class LinuxInterfaceDriver(object): - - # from linux IF_NAMESIZE - DEV_NAME_LEN = 14 - DEV_NAME_PREFIX = 'tap' - - def __init__(self, conf): - self.conf = conf - self.root_helper = config.get_root_helper(conf) - - def init_l3(self, device_name, ip_cidrs, namespace=None, - preserve_ips=None, gateway=None, extra_subnets=None): - """Set the L3 settings for the interface using data from the port. - - ip_cidrs: list of 'X.X.X.X/YY' strings - preserve_ips: list of ip cidrs that should not be removed from device - """ - if not preserve_ips: - preserve_ips = [] - if not extra_subnets: - extra_subnets = [] - device = ip_lib.IPDevice(device_name, - self.root_helper, - namespace=namespace) - - previous = {} - for address in device.addr.list(scope='global', filters=['permanent']): - previous[address['cidr']] = address['ip_version'] - - # add new addresses - for ip_cidr in ip_cidrs: - - net = netaddr.IPNetwork(ip_cidr) - # Convert to compact IPv6 address because the return values of - # "ip addr list" are compact. - if net.version == 6: - ip_cidr = str(net) - if ip_cidr in previous: - del previous[ip_cidr] - continue - - device.addr.add(net.version, ip_cidr, str(net.broadcast)) - - # clean up any old addresses - for ip_cidr, ip_version in previous.items(): - if ip_cidr not in preserve_ips: - device.addr.delete(ip_version, ip_cidr) - - if gateway: - device.route.add_gateway(gateway) - - new_onlink_routes = set(s['cidr'] for s in extra_subnets) - existing_onlink_routes = set(device.route.list_onlink_routes()) - for route in new_onlink_routes - existing_onlink_routes: - device.route.add_onlink_route(route) - for route in existing_onlink_routes - new_onlink_routes: - device.route.delete_onlink_route(route) - - def check_bridge_exists(self, bridge): - if not ip_lib.device_exists(bridge): - raise exceptions.BridgeDoesNotExist(bridge=bridge) - - def get_device_name(self, port): - return (self.DEV_NAME_PREFIX + port.id)[:self.DEV_NAME_LEN] - - @abc.abstractmethod - def plug(self, network_id, port_id, device_name, mac_address, - bridge=None, namespace=None, prefix=None): - """Plug in the interface.""" - - @abc.abstractmethod - def unplug(self, device_name, bridge=None, namespace=None, prefix=None): - """Unplug the interface.""" - - -class NullDriver(LinuxInterfaceDriver): - def plug(self, network_id, port_id, device_name, mac_address, - bridge=None, namespace=None, prefix=None): - pass - - def unplug(self, device_name, bridge=None, namespace=None, prefix=None): - pass - - -class OVSInterfaceDriver(LinuxInterfaceDriver): - """Driver for creating an internal interface on an OVS bridge.""" - - DEV_NAME_PREFIX = 'tap' - - def __init__(self, conf): - super(OVSInterfaceDriver, self).__init__(conf) - if self.conf.ovs_use_veth: - self.DEV_NAME_PREFIX = 'ns-' - - def _get_tap_name(self, dev_name, prefix=None): - if self.conf.ovs_use_veth: - dev_name = dev_name.replace(prefix or self.DEV_NAME_PREFIX, 'tap') - return dev_name - - def _ovs_add_port(self, bridge, device_name, port_id, mac_address, - internal=True): - cmd = ['ovs-vsctl', '--', '--if-exists', 'del-port', device_name, '--', - 'add-port', bridge, device_name] - if internal: - cmd += ['--', 'set', 'Interface', device_name, 'type=internal'] - cmd += ['--', 'set', 'Interface', device_name, - 'external-ids:iface-id=%s' % port_id, - '--', 'set', 'Interface', device_name, - 'external-ids:iface-status=active', - '--', 'set', 'Interface', device_name, - 'external-ids:attached-mac=%s' % mac_address] - utils.execute(cmd, self.root_helper) - - def plug(self, network_id, port_id, device_name, mac_address, - bridge=None, namespace=None, prefix=None): - """Plug in the interface.""" - if not bridge: - bridge = self.conf.ovs_integration_bridge - - if not ip_lib.device_exists(device_name, - self.root_helper, - namespace=namespace): - - self.check_bridge_exists(bridge) - - ip = ip_lib.IPWrapper(self.root_helper) - tap_name = self._get_tap_name(device_name, prefix) - - if self.conf.ovs_use_veth: - # Create ns_dev in a namespace if one is configured. - root_dev, ns_dev = ip.add_veth(tap_name, - device_name, - namespace2=namespace) - else: - ns_dev = ip.device(device_name) - - internal = not self.conf.ovs_use_veth - self._ovs_add_port(bridge, tap_name, port_id, mac_address, - internal=internal) - - ns_dev.link.set_address(mac_address) - - if self.conf.network_device_mtu: - ns_dev.link.set_mtu(self.conf.network_device_mtu) - if self.conf.ovs_use_veth: - root_dev.link.set_mtu(self.conf.network_device_mtu) - - # Add an interface created by ovs to the namespace. - if not self.conf.ovs_use_veth and namespace: - namespace_obj = ip.ensure_namespace(namespace) - namespace_obj.add_device_to_namespace(ns_dev) - - ns_dev.link.set_up() - if self.conf.ovs_use_veth: - root_dev.link.set_up() - else: - LOG.info(_("Device %s already exists"), device_name) - - def unplug(self, device_name, bridge=None, namespace=None, prefix=None): - """Unplug the interface.""" - if not bridge: - bridge = self.conf.ovs_integration_bridge - - tap_name = self._get_tap_name(device_name, prefix) - self.check_bridge_exists(bridge) - ovs = ovs_lib.OVSBridge(bridge, self.root_helper) - - try: - ovs.delete_port(tap_name) - if self.conf.ovs_use_veth: - device = ip_lib.IPDevice(device_name, - self.root_helper, - namespace) - device.link.delete() - LOG.debug(_("Unplugged interface '%s'"), device_name) - except RuntimeError: - LOG.error(_("Failed unplugging interface '%s'"), - device_name) - - -class MidonetInterfaceDriver(LinuxInterfaceDriver): - - def plug(self, network_id, port_id, device_name, mac_address, - bridge=None, namespace=None, prefix=None): - """Driver for creating an interface. - - This method is called by the Dhcp agent or by the L3 agent - when a new network is created - """ - if not ip_lib.device_exists(device_name, - self.root_helper, - namespace=namespace): - ip = ip_lib.IPWrapper(self.root_helper) - tap_name = device_name.replace(prefix or 'tap', 'tap') - - # Create ns_dev in a namespace if one is configured. - root_dev, ns_dev = ip.add_veth(tap_name, device_name, - namespace2=namespace) - - ns_dev.link.set_address(mac_address) - - # Add an interface created by ovs to the namespace. - namespace_obj = ip.ensure_namespace(namespace) - namespace_obj.add_device_to_namespace(ns_dev) - - ns_dev.link.set_up() - root_dev.link.set_up() - - cmd = ['mm-ctl', '--bind-port', port_id, device_name] - utils.execute(cmd, self.root_helper) - - else: - LOG.info(_("Device %s already exists"), device_name) - - def unplug(self, device_name, bridge=None, namespace=None, prefix=None): - # the port will be deleted by the dhcp agent that will call the plugin - device = ip_lib.IPDevice(device_name, - self.root_helper, - namespace) - try: - device.link.delete() - except RuntimeError: - LOG.error(_("Failed unplugging interface '%s'"), device_name) - LOG.debug(_("Unplugged interface '%s'"), device_name) - - ip_lib.IPWrapper( - self.root_helper, namespace).garbage_collect_namespace() - - -class IVSInterfaceDriver(LinuxInterfaceDriver): - """Driver for creating an internal interface on an IVS bridge.""" - - DEV_NAME_PREFIX = 'tap' - - def __init__(self, conf): - super(IVSInterfaceDriver, self).__init__(conf) - self.DEV_NAME_PREFIX = 'ns-' - - def _get_tap_name(self, dev_name, prefix=None): - dev_name = dev_name.replace(prefix or self.DEV_NAME_PREFIX, 'tap') - return dev_name - - def _ivs_add_port(self, device_name, port_id, mac_address): - cmd = ['ivs-ctl', 'add-port', device_name] - utils.execute(cmd, self.root_helper) - - def plug(self, network_id, port_id, device_name, mac_address, - bridge=None, namespace=None, prefix=None): - """Plug in the interface.""" - if not ip_lib.device_exists(device_name, - self.root_helper, - namespace=namespace): - - ip = ip_lib.IPWrapper(self.root_helper) - tap_name = self._get_tap_name(device_name, prefix) - - root_dev, ns_dev = ip.add_veth(tap_name, device_name) - - self._ivs_add_port(tap_name, port_id, mac_address) - - ns_dev = ip.device(device_name) - ns_dev.link.set_address(mac_address) - - if self.conf.network_device_mtu: - ns_dev.link.set_mtu(self.conf.network_device_mtu) - root_dev.link.set_mtu(self.conf.network_device_mtu) - - if namespace: - namespace_obj = ip.ensure_namespace(namespace) - namespace_obj.add_device_to_namespace(ns_dev) - - ns_dev.link.set_up() - root_dev.link.set_up() - else: - LOG.info(_("Device %s already exists"), device_name) - - def unplug(self, device_name, bridge=None, namespace=None, prefix=None): - """Unplug the interface.""" - tap_name = self._get_tap_name(device_name, prefix) - try: - cmd = ['ivs-ctl', 'del-port', tap_name] - utils.execute(cmd, self.root_helper) - device = ip_lib.IPDevice(device_name, - self.root_helper, - namespace) - device.link.delete() - LOG.debug(_("Unplugged interface '%s'"), device_name) - except RuntimeError: - LOG.error(_("Failed unplugging interface '%s'"), - device_name) - - -class BridgeInterfaceDriver(LinuxInterfaceDriver): - """Driver for creating bridge interfaces.""" - - DEV_NAME_PREFIX = 'ns-' - - def plug(self, network_id, port_id, device_name, mac_address, - bridge=None, namespace=None, prefix=None): - """Plugin the interface.""" - if not ip_lib.device_exists(device_name, - self.root_helper, - namespace=namespace): - ip = ip_lib.IPWrapper(self.root_helper) - - # Enable agent to define the prefix - if prefix: - tap_name = device_name.replace(prefix, 'tap') - else: - tap_name = device_name.replace(self.DEV_NAME_PREFIX, 'tap') - # Create ns_veth in a namespace if one is configured. - root_veth, ns_veth = ip.add_veth(tap_name, device_name, - namespace2=namespace) - ns_veth.link.set_address(mac_address) - - if self.conf.network_device_mtu: - root_veth.link.set_mtu(self.conf.network_device_mtu) - ns_veth.link.set_mtu(self.conf.network_device_mtu) - - root_veth.link.set_up() - ns_veth.link.set_up() - - else: - LOG.info(_("Device %s already exists"), device_name) - - def unplug(self, device_name, bridge=None, namespace=None, prefix=None): - """Unplug the interface.""" - device = ip_lib.IPDevice(device_name, self.root_helper, namespace) - try: - device.link.delete() - LOG.debug(_("Unplugged interface '%s'"), device_name) - except RuntimeError: - LOG.error(_("Failed unplugging interface '%s'"), - device_name) - - -class MetaInterfaceDriver(LinuxInterfaceDriver): - def __init__(self, conf): - super(MetaInterfaceDriver, self).__init__(conf) - from tackerclient.v2_0 import client - self.tacker = client.Client( - username=self.conf.admin_user, - password=self.conf.admin_password, - tenant_name=self.conf.admin_tenant_name, - auth_url=self.conf.auth_url, - auth_strategy=self.conf.auth_strategy, - region_name=self.conf.auth_region - ) - self.flavor_driver_map = {} - for net_flavor, driver_name in [ - driver_set.split(':') - for driver_set in - self.conf.meta_flavor_driver_mappings.split(',')]: - self.flavor_driver_map[net_flavor] = self._load_driver(driver_name) - - def _get_flavor_by_network_id(self, network_id): - network = self.tacker.show_network(network_id) - return network['network'][flavor.FLAVOR_NETWORK] - - def _get_driver_by_network_id(self, network_id): - net_flavor = self._get_flavor_by_network_id(network_id) - return self.flavor_driver_map[net_flavor] - - def _set_device_plugin_tag(self, network_id, device_name, namespace=None): - plugin_tag = self._get_flavor_by_network_id(network_id) - device = ip_lib.IPDevice(device_name, self.conf.root_helper, namespace) - device.link.set_alias(plugin_tag) - - def _get_device_plugin_tag(self, device_name, namespace=None): - device = ip_lib.IPDevice(device_name, self.conf.root_helper, namespace) - return device.link.alias - - def get_device_name(self, port): - driver = self._get_driver_by_network_id(port.network_id) - return driver.get_device_name(port) - - def plug(self, network_id, port_id, device_name, mac_address, - bridge=None, namespace=None, prefix=None): - driver = self._get_driver_by_network_id(network_id) - ret = driver.plug(network_id, port_id, device_name, mac_address, - bridge=bridge, namespace=namespace, prefix=prefix) - self._set_device_plugin_tag(network_id, device_name, namespace) - return ret - - def unplug(self, device_name, bridge=None, namespace=None, prefix=None): - plugin_tag = self._get_device_plugin_tag(device_name, namespace) - driver = self.flavor_driver_map[plugin_tag] - return driver.unplug(device_name, bridge, namespace, prefix) - - def _load_driver(self, driver_provider): - LOG.debug(_("Driver location: %s"), driver_provider) - plugin_klass = importutils.import_class(driver_provider) - return plugin_klass(self.conf) diff --git a/tacker/agent/linux/ip_lib.py b/tacker/agent/linux/ip_lib.py deleted file mode 100644 index b483183f7..000000000 --- a/tacker/agent/linux/ip_lib.py +++ /dev/null @@ -1,570 +0,0 @@ -# Copyright 2012 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import netaddr -from oslo_config import cfg - -from tacker.agent.linux import utils -from tacker.common import exceptions - - -OPTS = [ - cfg.BoolOpt('ip_lib_force_root', - default=False, - help=_('Force ip_lib calls to use the root helper')), -] - - -VETH_MAX_NAME_LENGTH = 15 -LOOPBACK_DEVNAME = 'lo' -# NOTE(ethuleau): depend of the version of iproute2, the vlan -# interface details vary. -VLAN_INTERFACE_DETAIL = ['vlan protocol 802.1q', - 'vlan protocol 802.1Q', - 'vlan id'] - - -class SubProcessBase(object): - def __init__(self, root_helper=None, namespace=None): - self.root_helper = root_helper - self.namespace = namespace - try: - self.force_root = cfg.CONF.ip_lib_force_root - except cfg.NoSuchOptError: - # Only callers that need to force use of the root helper - # need to register the option. - self.force_root = False - - def _run(self, options, command, args): - if self.namespace: - return self._as_root(options, command, args) - elif self.force_root: - # Force use of the root helper to ensure that commands - # will execute in dom0 when running under XenServer/XCP. - return self._execute(options, command, args, self.root_helper) - else: - return self._execute(options, command, args) - - def _as_root(self, options, command, args, use_root_namespace=False): - if not self.root_helper: - raise exceptions.SudoRequired() - - namespace = self.namespace if not use_root_namespace else None - - return self._execute(options, - command, - args, - self.root_helper, - namespace) - - @classmethod - def _execute(cls, options, command, args, root_helper=None, - namespace=None): - opt_list = ['-%s' % o for o in options] - if namespace: - ip_cmd = ['ip', 'netns', 'exec', namespace, 'ip'] - else: - ip_cmd = ['ip'] - return utils.execute(ip_cmd + opt_list + [command] + list(args), - root_helper=root_helper) - - -class IPWrapper(SubProcessBase): - def __init__(self, root_helper=None, namespace=None): - super(IPWrapper, self).__init__(root_helper=root_helper, - namespace=namespace) - self.netns = IpNetnsCommand(self) - - def device(self, name): - return IPDevice(name, self.root_helper, self.namespace) - - def get_devices(self, exclude_loopback=False): - retval = [] - output = self._execute(['o', 'd'], 'link', ('list',), - self.root_helper, self.namespace) - for line in output.split('\n'): - if '<' not in line: - continue - tokens = line.split(' ', 2) - if len(tokens) == 3: - if any(v in tokens[2] for v in VLAN_INTERFACE_DETAIL): - delimiter = '@' - else: - delimiter = ':' - name = tokens[1].rpartition(delimiter)[0].strip() - - if exclude_loopback and name == LOOPBACK_DEVNAME: - continue - - retval.append(IPDevice(name, - self.root_helper, - self.namespace)) - return retval - - def add_tuntap(self, name, mode='tap'): - self._as_root('', 'tuntap', ('add', name, 'mode', mode)) - return IPDevice(name, self.root_helper, self.namespace) - - def add_veth(self, name1, name2, namespace2=None): - args = ['add', name1, 'type', 'veth', 'peer', 'name', name2] - - if namespace2 is None: - namespace2 = self.namespace - else: - self.ensure_namespace(namespace2) - args += ['netns', namespace2] - - self._as_root('', 'link', tuple(args)) - - return (IPDevice(name1, self.root_helper, self.namespace), - IPDevice(name2, self.root_helper, namespace2)) - - def ensure_namespace(self, name): - if not self.netns.exists(name): - ip = self.netns.add(name) - lo = ip.device(LOOPBACK_DEVNAME) - lo.link.set_up() - else: - ip = IPWrapper(self.root_helper, name) - return ip - - def namespace_is_empty(self): - return not self.get_devices(exclude_loopback=True) - - def garbage_collect_namespace(self): - """Conditionally destroy the namespace if it is empty.""" - if self.namespace and self.netns.exists(self.namespace): - if self.namespace_is_empty(): - self.netns.delete(self.namespace) - return True - return False - - def add_device_to_namespace(self, device): - if self.namespace: - device.link.set_netns(self.namespace) - - def add_vxlan(self, name, vni, group=None, dev=None, ttl=None, tos=None, - local=None, port=None, proxy=False): - cmd = ['add', name, 'type', 'vxlan', 'id', vni] - if group: - cmd.extend(['group', group]) - if dev: - cmd.extend(['dev', dev]) - if ttl: - cmd.extend(['ttl', ttl]) - if tos: - cmd.extend(['tos', tos]) - if local: - cmd.extend(['local', local]) - if proxy: - cmd.append('proxy') - # tuple: min,max - if port and len(port) == 2: - cmd.extend(['port', port[0], port[1]]) - elif port: - raise exceptions.NetworkVxlanPortRangeError(vxlan_range=port) - self._as_root('', 'link', cmd) - return (IPDevice(name, self.root_helper, self.namespace)) - - @classmethod - def get_namespaces(cls, root_helper): - output = cls._execute('', 'netns', ('list',), root_helper=root_helper) - return [l.strip() for l in output.split('\n')] - - -class IpRule(IPWrapper): - def add_rule_from(self, ip, table, rule_pr): - args = ['add', 'from', ip, 'lookup', table, 'priority', rule_pr] - ip = self._as_root('', 'rule', tuple(args)) - return ip - - def delete_rule_priority(self, rule_pr): - args = ['del', 'priority', rule_pr] - ip = self._as_root('', 'rule', tuple(args)) - return ip - - -class IPDevice(SubProcessBase): - def __init__(self, name, root_helper=None, namespace=None): - super(IPDevice, self).__init__(root_helper=root_helper, - namespace=namespace) - self.name = name - self.link = IpLinkCommand(self) - self.addr = IpAddrCommand(self) - self.route = IpRouteCommand(self) - self.neigh = IpNeighCommand(self) - - def __eq__(self, other): - return (other is not None and self.name == other.name - and self.namespace == other.namespace) - - def __str__(self): - return self.name - - -class IpCommandBase(object): - COMMAND = '' - - def __init__(self, parent): - self._parent = parent - - def _run(self, *args, **kwargs): - return self._parent._run(kwargs.get('options', []), self.COMMAND, args) - - def _as_root(self, *args, **kwargs): - return self._parent._as_root(kwargs.get('options', []), - self.COMMAND, - args, - kwargs.get('use_root_namespace', False)) - - -class IpDeviceCommandBase(IpCommandBase): - @property - def name(self): - return self._parent.name - - -class IpLinkCommand(IpDeviceCommandBase): - COMMAND = 'link' - - def set_address(self, mac_address): - self._as_root('set', self.name, 'address', mac_address) - - def set_mtu(self, mtu_size): - self._as_root('set', self.name, 'mtu', mtu_size) - - def set_up(self): - self._as_root('set', self.name, 'up') - - def set_down(self): - self._as_root('set', self.name, 'down') - - def set_netns(self, namespace): - self._as_root('set', self.name, 'netns', namespace) - self._parent.namespace = namespace - - def set_name(self, name): - self._as_root('set', self.name, 'name', name) - self._parent.name = name - - def set_alias(self, alias_name): - self._as_root('set', self.name, 'alias', alias_name) - - def delete(self): - self._as_root('delete', self.name) - - @property - def address(self): - return self.attributes.get('link/ether') - - @property - def state(self): - return self.attributes.get('state') - - @property - def mtu(self): - return self.attributes.get('mtu') - - @property - def qdisc(self): - return self.attributes.get('qdisc') - - @property - def qlen(self): - return self.attributes.get('qlen') - - @property - def alias(self): - return self.attributes.get('alias') - - @property - def attributes(self): - return self._parse_line(self._run('show', self.name, options='o')) - - def _parse_line(self, value): - if not value: - return {} - - device_name, settings = value.replace("\\", '').split('>', 1) - tokens = settings.split() - keys = tokens[::2] - values = [int(v) if v.isdigit() else v for v in tokens[1::2]] - - retval = dict(zip(keys, values)) - return retval - - -class IpAddrCommand(IpDeviceCommandBase): - COMMAND = 'addr' - - def add(self, ip_version, cidr, broadcast, scope='global'): - self._as_root('add', - cidr, - 'brd', - broadcast, - 'scope', - scope, - 'dev', - self.name, - options=[ip_version]) - - def delete(self, ip_version, cidr): - self._as_root('del', - cidr, - 'dev', - self.name, - options=[ip_version]) - - def flush(self): - self._as_root('flush', self.name) - - def list(self, scope=None, to=None, filters=None): - if filters is None: - filters = [] - - retval = [] - - if scope: - filters += ['scope', scope] - if to: - filters += ['to', to] - - for line in self._run('show', self.name, *filters).split('\n'): - line = line.strip() - if not line.startswith('inet'): - continue - parts = line.split() - if parts[0] == 'inet6': - version = 6 - scope = parts[3] - broadcast = '::' - else: - version = 4 - if parts[2] == 'brd': - broadcast = parts[3] - scope = parts[5] - else: - # sometimes output of 'ip a' might look like: - # inet 192.168.100.100/24 scope global eth0 - # and broadcast needs to be calculated from CIDR - broadcast = str(netaddr.IPNetwork(parts[1]).broadcast) - scope = parts[3] - - retval.append(dict(cidr=parts[1], - broadcast=broadcast, - scope=scope, - ip_version=version, - dynamic=('dynamic' == parts[-1]))) - return retval - - -class IpRouteCommand(IpDeviceCommandBase): - COMMAND = 'route' - - def add_gateway(self, gateway, metric=None, table=None): - args = ['replace', 'default', 'via', gateway] - if metric: - args += ['metric', metric] - args += ['dev', self.name] - if table: - args += ['table', table] - self._as_root(*args) - - def delete_gateway(self, gateway=None, table=None): - args = ['del', 'default'] - if gateway: - args += ['via', gateway] - args += ['dev', self.name] - if table: - args += ['table', table] - self._as_root(*args) - - def list_onlink_routes(self): - def iterate_routes(): - output = self._run('list', 'dev', self.name, 'scope', 'link') - for line in output.split('\n'): - line = line.strip() - if line and not line.count('src'): - yield line - - return [x for x in iterate_routes()] - - def add_onlink_route(self, cidr): - self._as_root('replace', cidr, 'dev', self.name, 'scope', 'link') - - def delete_onlink_route(self, cidr): - self._as_root('del', cidr, 'dev', self.name, 'scope', 'link') - - def get_gateway(self, scope=None, filters=None): - if filters is None: - filters = [] - - retval = None - - if scope: - filters += ['scope', scope] - - route_list_lines = self._run('list', 'dev', self.name, - *filters).split('\n') - default_route_line = next((x.strip() for x in - route_list_lines if - x.strip().startswith('default')), None) - if default_route_line: - gateway_index = 2 - parts = default_route_line.split() - retval = dict(gateway=parts[gateway_index]) - if 'metric' in parts: - metric_index = parts.index('metric') + 1 - retval.update(metric=int(parts[metric_index])) - - return retval - - def pullup_route(self, interface_name): - """Route entry pullup. - - Ensures that the route entry for the interface is before all - others on the same subnet. - """ - device_list = [] - device_route_list_lines = self._run('list', 'proto', 'kernel', - 'dev', interface_name).split('\n') - for device_route_line in device_route_list_lines: - try: - subnet = device_route_line.split()[0] - except Exception: - continue - subnet_route_list_lines = self._run('list', 'proto', 'kernel', - 'match', subnet).split('\n') - for subnet_route_line in subnet_route_list_lines: - i = iter(subnet_route_line.split()) - while(next(i) != 'dev'): - pass - device = next(i) - try: - while(next(i) != 'src'): - pass - src = next(i) - except Exception: - src = '' - if device != interface_name: - device_list.append((device, src)) - else: - break - - for (device, src) in device_list: - self._as_root('del', subnet, 'dev', device) - if (src != ''): - self._as_root('append', subnet, 'proto', 'kernel', - 'src', src, 'dev', device) - else: - self._as_root('append', subnet, 'proto', 'kernel', - 'dev', device) - - def add_route(self, cidr, ip, table=None): - args = ['replace', cidr, 'via', ip, 'dev', self.name] - if table: - args += ['table', table] - self._as_root(*args) - - def delete_route(self, cidr, ip, table=None): - args = ['del', cidr, 'via', ip, 'dev', self.name] - if table: - args += ['table', table] - self._as_root(*args) - - -class IpNeighCommand(IpDeviceCommandBase): - COMMAND = 'neigh' - - def add(self, ip_version, ip_address, mac_address): - self._as_root('replace', - ip_address, - 'lladdr', - mac_address, - 'nud', - 'permanent', - 'dev', - self.name, - options=[ip_version]) - - def delete(self, ip_version, ip_address, mac_address): - self._as_root('del', - ip_address, - 'lladdr', - mac_address, - 'dev', - self.name, - options=[ip_version]) - - -class IpNetnsCommand(IpCommandBase): - COMMAND = 'netns' - - def add(self, name): - self._as_root('add', name, use_root_namespace=True) - return IPWrapper(self._parent.root_helper, name) - - def delete(self, name): - self._as_root('delete', name, use_root_namespace=True) - - def execute(self, cmds, addl_env=None, check_exit_code=True): - if not self._parent.root_helper: - raise exceptions.SudoRequired() - ns_params = [] - if self._parent.namespace: - ns_params = ['ip', 'netns', 'exec', self._parent.namespace] - - env_params = [] - if addl_env: - env_params = (['env'] + - ['%s=%s' % pair for pair in addl_env.items()]) - return utils.execute( - ns_params + env_params + list(cmds), - root_helper=self._parent.root_helper, - check_exit_code=check_exit_code) - - def exists(self, name): - output = self._parent._execute('o', 'netns', ['list']) - - for line in output.split('\n'): - if name == line.strip(): - return True - return False - - -def device_exists(device_name, root_helper=None, namespace=None): - try: - address = IPDevice(device_name, root_helper, namespace).link.address - except RuntimeError: - return False - return bool(address) - - -def ensure_device_is_ready(device_name, root_helper=None, namespace=None): - dev = IPDevice(device_name, root_helper, namespace) - try: - # Ensure the device is up, even if it is already up. If the device - # doesn't exist, a RuntimeError will be raised. - dev.link.set_up() - except RuntimeError: - return False - return True - - -def iproute_arg_supported(command, arg, root_helper=None): - command += ['help'] - stdout, stderr = utils.execute(command, root_helper=root_helper, - check_exit_code=False, return_stderr=True) - return any(arg in line for line in stderr.split('\n')) diff --git a/tacker/agent/linux/polling.py b/tacker/agent/linux/polling.py deleted file mode 100644 index 8191b94d5..000000000 --- a/tacker/agent/linux/polling.py +++ /dev/null @@ -1,115 +0,0 @@ -# Copyright 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import contextlib - -import eventlet - -from tacker.agent.linux import ovsdb_monitor -from tacker.plugins.openvswitch.common import constants - - -@contextlib.contextmanager -def get_polling_manager(minimize_polling=False, - root_helper=None, - ovsdb_monitor_respawn_interval=( - constants.DEFAULT_OVSDBMON_RESPAWN)): - if minimize_polling: - pm = InterfacePollingMinimizer( - root_helper=root_helper, - ovsdb_monitor_respawn_interval=ovsdb_monitor_respawn_interval) - pm.start() - else: - pm = AlwaysPoll() - try: - yield pm - finally: - if minimize_polling: - pm.stop() - - -class BasePollingManager(object): - - def __init__(self): - self._force_polling = False - self._polling_completed = True - - def force_polling(self): - self._force_polling = True - - def polling_completed(self): - self._polling_completed = True - - def _is_polling_required(self): - raise NotImplementedError - - @property - def is_polling_required(self): - # Always consume the updates to minimize polling. - polling_required = self._is_polling_required() - - # Polling is required regardless of whether updates have been - # detected. - if self._force_polling: - self._force_polling = False - polling_required = True - - # Polling is required if not yet done for previously detected - # updates. - if not self._polling_completed: - polling_required = True - - if polling_required: - # Track whether polling has been completed to ensure that - # polling can be required until the caller indicates via a - # call to polling_completed() that polling has been - # successfully performed. - self._polling_completed = False - - return polling_required - - -class AlwaysPoll(BasePollingManager): - - def _is_polling_required(self): - return True - - @property - def is_polling_required(self): - return self._is_polling_required() - - -class InterfacePollingMinimizer(BasePollingManager): - """Monitors ovsdb to determine when polling is required.""" - - def __init__(self, root_helper=None, - ovsdb_monitor_respawn_interval=( - constants.DEFAULT_OVSDBMON_RESPAWN)): - - super(InterfacePollingMinimizer, self).__init__() - self._monitor = ovsdb_monitor.SimpleInterfaceMonitor( - root_helper=root_helper, - respawn_interval=ovsdb_monitor_respawn_interval) - - def start(self): - self._monitor.start() - - def stop(self): - self._monitor.stop() - - def _is_polling_required(self): - # Maximize the chances of update detection having a chance to - # collect output. - eventlet.sleep() - return self._monitor.has_updates diff --git a/tacker/agent/rpc.py b/tacker/agent/rpc.py deleted file mode 100644 index 016b7a666..000000000 --- a/tacker/agent/rpc.py +++ /dev/null @@ -1,109 +0,0 @@ -# Copyright (c) 2012 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import itertools - -from tacker.common import rpc_compat -from tacker.common import topics -from tacker.openstack.common import log as logging -from tacker.openstack.common import timeutils - - -LOG = logging.getLogger(__name__) - - -def create_consumers(endpoints, prefix, topic_details): - """Create agent RPC consumers. - - :param endpoints: The list of endpoints to process the incoming messages. - :param prefix: Common prefix for the plugin/agent message queues. - :param topic_details: A list of topics. Each topic has a name, an - operation, and an optional host param keying the - subscription to topic.host for plugin calls. - - :returns: A common Connection. - """ - - connection = rpc_compat.create_connection(new=True) - for details in topic_details: - topic, operation, node_name = itertools.islice( - itertools.chain(details, [None]), 3) - - topic_name = topics.get_topic_name(prefix, topic, operation) - connection.create_consumer(topic_name, endpoints, fanout=True) - if node_name: - node_topic_name = '%s.%s' % (topic_name, node_name) - connection.create_consumer(node_topic_name, - endpoints, - fanout=False) - connection.consume_in_threads() - return connection - - -class PluginReportStateAPI(rpc_compat.RpcProxy): - BASE_RPC_API_VERSION = '1.0' - - def __init__(self, topic): - super(PluginReportStateAPI, self).__init__( - topic=topic, default_version=self.BASE_RPC_API_VERSION) - - def report_state(self, context, agent_state, use_call=False): - msg = self.make_msg('report_state', - agent_state={'agent_state': - agent_state}, - time=timeutils.strtime()) - if use_call: - return self.call(context, msg, topic=self.topic) - else: - return self.cast(context, msg, topic=self.topic) - - -class PluginApi(rpc_compat.RpcProxy): - '''Agent side of the rpc API. - - API version history: - 1.0 - Initial version. - - ''' - - BASE_RPC_API_VERSION = '1.1' - - def __init__(self, topic): - super(PluginApi, self).__init__( - topic=topic, default_version=self.BASE_RPC_API_VERSION) - - def get_device_details(self, context, device, agent_id): - return self.call(context, - self.make_msg('get_device_details', device=device, - agent_id=agent_id), - topic=self.topic) - - def update_device_down(self, context, device, agent_id, host=None): - return self.call(context, - self.make_msg('update_device_down', device=device, - agent_id=agent_id, host=host), - topic=self.topic) - - def update_device_up(self, context, device, agent_id, host=None): - return self.call(context, - self.make_msg('update_device_up', device=device, - agent_id=agent_id, host=host), - topic=self.topic) - - def tunnel_sync(self, context, tunnel_ip, tunnel_type=None): - return self.call(context, - self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip, - tunnel_type=tunnel_type), - topic=self.topic) diff --git a/tacker/tests/fake_notifier.py b/tacker/tests/fake_notifier.py deleted file mode 100644 index 012f3351e..000000000 --- a/tacker/tests/fake_notifier.py +++ /dev/null @@ -1,50 +0,0 @@ -# Copyright 2014 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections -import functools - - -NOTIFICATIONS = [] - - -def reset(): - del NOTIFICATIONS[:] - - -FakeMessage = collections.namedtuple('Message', - ['publisher_id', 'priority', - 'event_type', 'payload']) - - -class FakeNotifier(object): - - def __init__(self, transport, publisher_id=None): - self.transport = transport - self.publisher_id = publisher_id - for priority in ('debug', 'info', 'warn', 'error', 'critical'): - setattr(self, priority, - functools.partial(self._notify, priority=priority.upper())) - - def prepare(self, publisher_id=None): - if publisher_id is None: - publisher_id = self.publisher_id - return self.__class__(self.transport, publisher_id) - - def _notify(self, ctxt, event_type, payload, priority): - msg = dict(publisher_id=self.publisher_id, - priority=priority, - event_type=event_type, - payload=payload) - NOTIFICATIONS.append(msg) diff --git a/tacker/tests/functional/agent/__init__.py b/tacker/tests/functional/agent/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tacker/tests/functional/agent/linux/__init__.py b/tacker/tests/functional/agent/linux/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tacker/tests/unit/agent/__init__.py b/tacker/tests/unit/agent/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tacker/tests/unit/agent/linux/__init__.py b/tacker/tests/unit/agent/linux/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tacker/tests/unit/agent/linux/test_async_process.py b/tacker/tests/unit/agent/linux/test_async_process.py deleted file mode 100644 index 2232f01a2..000000000 --- a/tacker/tests/unit/agent/linux/test_async_process.py +++ /dev/null @@ -1,249 +0,0 @@ -# Copyright 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import eventlet.event -import eventlet.queue -import eventlet.timeout -import mock -import testtools - -from tacker.agent.linux import async_process -from tacker.agent.linux import utils -from tacker.tests import base - - -_marker = () - - -class TestAsyncProcess(base.BaseTestCase): - - def setUp(self): - super(TestAsyncProcess, self).setUp() - self.proc = async_process.AsyncProcess(['fake']) - - def test_construtor_raises_exception_for_negative_respawn_interval(self): - with testtools.ExpectedException(ValueError): - async_process.AsyncProcess(['fake'], respawn_interval=-1) - - def test__spawn(self): - expected_process = 'Foo' - proc = self.proc - with mock.patch.object(utils, 'create_process') as mock_create_process: - mock_create_process.return_value = [expected_process, None] - with mock.patch('eventlet.spawn') as mock_spawn: - proc._spawn() - - self.assertIsInstance(proc._kill_event, eventlet.event.Event) - self.assertEqual(proc._process, expected_process) - mock_spawn.assert_has_calls([ - mock.call(proc._watch_process, - proc._read_stdout, - proc._kill_event), - mock.call(proc._watch_process, - proc._read_stderr, - proc._kill_event), - ]) - self.assertEqual(len(proc._watchers), 2) - - def test__handle_process_error_kills_with_respawn(self): - with mock.patch.object(self.proc, '_kill') as kill: - self.proc._handle_process_error() - - kill.assert_has_calls(mock.call(respawning=False)) - - def test__handle_process_error_kills_without_respawn(self): - self.proc.respawn_interval = 1 - with mock.patch.object(self.proc, '_kill') as kill: - with mock.patch.object(self.proc, '_spawn') as spawn: - with mock.patch('eventlet.sleep') as sleep: - self.proc._handle_process_error() - - kill.assert_has_calls(mock.call(respawning=True)) - sleep.assert_has_calls(mock.call(self.proc.respawn_interval)) - spawn.assert_called_once_with() - - def _test__watch_process(self, callback, kill_event): - self.proc._kill_event = kill_event - # Ensure the test times out eventually if the watcher loops endlessly - with eventlet.timeout.Timeout(5): - with mock.patch.object(self.proc, - '_handle_process_error') as func: - self.proc._watch_process(callback, kill_event) - - if not kill_event.ready(): - func.assert_called_once_with() - - def test__watch_process_exits_on_callback_failure(self): - self._test__watch_process(lambda: False, eventlet.event.Event()) - - def test__watch_process_exits_on_exception(self): - def foo(): - raise Exception('Error!') - self._test__watch_process(foo, eventlet.event.Event()) - - def test__watch_process_exits_on_sent_kill_event(self): - kill_event = eventlet.event.Event() - kill_event.send() - self._test__watch_process(None, kill_event) - - def _test_read_output_queues_and_returns_result(self, output): - queue = eventlet.queue.LightQueue() - mock_stream = mock.Mock() - with mock.patch.object(mock_stream, 'readline') as mock_readline: - mock_readline.return_value = output - result = self.proc._read(mock_stream, queue) - - if output: - self.assertEqual(output, result) - self.assertEqual(output, queue.get_nowait()) - else: - self.assertFalse(result) - self.assertTrue(queue.empty()) - - def test__read_queues_and_returns_output(self): - self._test_read_output_queues_and_returns_result('foo') - - def test__read_returns_none_for_missing_output(self): - self._test_read_output_queues_and_returns_result('') - - def test_start_raises_exception_if_process_already_started(self): - self.proc._kill_event = True - with testtools.ExpectedException(async_process.AsyncProcessException): - self.proc.start() - - def test_start_invokes__spawn(self): - with mock.patch.object(self.proc, '_spawn') as mock_start: - self.proc.start() - - mock_start.assert_called_once_with() - - def test__iter_queue_returns_empty_list_for_empty_queue(self): - result = list(self.proc._iter_queue(eventlet.queue.LightQueue())) - self.assertEqual(result, []) - - def test__iter_queue_returns_queued_data(self): - queue = eventlet.queue.LightQueue() - queue.put('foo') - result = list(self.proc._iter_queue(queue)) - self.assertEqual(result, ['foo']) - - def _test_iter_output_calls_iter_queue_on_output_queue(self, output_type): - expected_value = 'foo' - with mock.patch.object(self.proc, '_iter_queue') as mock_iter_queue: - mock_iter_queue.return_value = expected_value - target_func = getattr(self.proc, 'iter_%s' % output_type, None) - value = target_func() - - self.assertEqual(value, expected_value) - queue = getattr(self.proc, '_%s_lines' % output_type, None) - mock_iter_queue.assert_called_with(queue) - - def test_iter_stdout(self): - self._test_iter_output_calls_iter_queue_on_output_queue('stdout') - - def test_iter_stderr(self): - self._test_iter_output_calls_iter_queue_on_output_queue('stderr') - - def _test__kill(self, respawning, pid=None): - with mock.patch.object(self.proc, '_kill_event') as mock_kill_event: - with mock.patch.object(self.proc, '_get_pid_to_kill', - return_value=pid): - with mock.patch.object(self.proc, - '_kill_process') as mock_kill_process: - self.proc._kill(respawning) - - if respawning: - self.assertIsNotNone(self.proc._kill_event) - else: - self.assertIsNone(self.proc._kill_event) - - mock_kill_event.send.assert_called_once_with() - if pid: - mock_kill_process.assert_called_once_with(pid) - - def test__kill_when_respawning_does_not_clear_kill_event(self): - self._test__kill(True) - - def test__kill_when_not_respawning_clears_kill_event(self): - self._test__kill(False) - - def test__kill_targets_process_for_pid(self): - self._test__kill(False, pid='1') - - def _test__get_pid_to_kill(self, expected=_marker, - root_helper=None, pids=None): - def _find_child_pids(x): - if not pids: - return [] - pids.pop(0) - return pids - - if root_helper: - self.proc.root_helper = root_helper - - with mock.patch.object(self.proc, '_process') as mock_process: - with mock.patch.object(mock_process, 'pid') as mock_pid: - with mock.patch.object(utils, 'find_child_pids', - side_effect=_find_child_pids): - actual = self.proc._get_pid_to_kill() - if expected is _marker: - expected = mock_pid - self.assertEqual(expected, actual) - - def test__get_pid_to_kill_returns_process_pid_without_root_helper(self): - self._test__get_pid_to_kill() - - def test__get_pid_to_kill_returns_child_pid_with_root_helper(self): - self._test__get_pid_to_kill(expected='2', pids=['1', '2'], - root_helper='a') - - def test__get_pid_to_kill_returns_last_child_pid_with_root_Helper(self): - self._test__get_pid_to_kill(expected='3', pids=['1', '2', '3'], - root_helper='a') - - def test__get_pid_to_kill_returns_none_with_root_helper(self): - self._test__get_pid_to_kill(expected=None, root_helper='a') - - def _test__kill_process(self, pid, expected, exception_message=None): - self.proc.root_helper = 'foo' - if exception_message: - exc = RuntimeError(exception_message) - else: - exc = None - with mock.patch.object(utils, 'execute', - side_effect=exc) as mock_execute: - actual = self.proc._kill_process(pid) - - self.assertEqual(expected, actual) - mock_execute.assert_called_with(['kill', '-9', pid], - root_helper=self.proc.root_helper) - - def test__kill_process_returns_true_for_valid_pid(self): - self._test__kill_process('1', True) - - def test__kill_process_returns_true_for_stale_pid(self): - self._test__kill_process('1', True, 'No such process') - - def test__kill_process_returns_false_for_execute_exception(self): - self._test__kill_process('1', False, 'Invalid') - - def test_stop_calls_kill(self): - self.proc._kill_event = True - with mock.patch.object(self.proc, '_kill') as mock_kill: - self.proc.stop() - mock_kill.assert_called_once_with() - - def test_stop_raises_exception_if_already_started(self): - with testtools.ExpectedException(async_process.AsyncProcessException): - self.proc.stop()