Remove neutron agents codes

Partial-Bug: 1515864

Co-Authored-By: gong yong sheng <gong.yongsheng@99cloud.net>
Co-Authored-By: dharmendra <dharmendra.kushwaha@nectechnologies.in>

Change-Id: Id5ffba5767950a63097f99e0284a6215dc7e5476
This commit is contained in:
dharmendra 2016-04-07 21:19:58 +05:30 committed by gong yong sheng
parent eb7353fe34
commit 4d58c761a0
15 changed files with 0 additions and 2136 deletions

View File

@ -1,121 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslo_config import cfg
from tacker.common import config
from tacker.openstack.common import log as logging
LOG = logging.getLogger(__name__)
ROOT_HELPER_OPTS = [
cfg.StrOpt('root_helper', default='sudo',
help=_('Root helper application.')),
]
AGENT_STATE_OPTS = [
cfg.FloatOpt('report_interval', default=30,
help=_('Seconds between nodes reporting state to server; '
'should be less than agent_down_time, best if it '
'is half or less than agent_down_time.')),
]
INTERFACE_DRIVER_OPTS = [
cfg.StrOpt('interface_driver',
help=_("The driver used to manage the virtual interface.")),
]
USE_NAMESPACES_OPTS = [
cfg.BoolOpt('use_namespaces', default=True,
help=_("Allow overlapping IP.")),
]
def get_log_args(conf, log_file_name):
cmd_args = []
if conf.debug:
cmd_args.append('--debug')
if conf.verbose:
cmd_args.append('--verbose')
if (conf.log_dir or conf.log_file):
cmd_args.append('--log-file=%s' % log_file_name)
log_dir = None
if conf.log_dir and conf.log_file:
log_dir = os.path.dirname(
os.path.join(conf.log_dir, conf.log_file))
elif conf.log_dir:
log_dir = conf.log_dir
elif conf.log_file:
log_dir = os.path.dirname(conf.log_file)
if log_dir:
cmd_args.append('--log-dir=%s' % log_dir)
else:
if conf.use_syslog:
cmd_args.append('--use-syslog')
if conf.syslog_log_facility:
cmd_args.append(
'--syslog-log-facility=%s' % conf.syslog_log_facility)
return cmd_args
def register_root_helper(conf):
# The first call is to ensure backward compatibility
conf.register_opts(ROOT_HELPER_OPTS)
conf.register_opts(ROOT_HELPER_OPTS, 'AGENT')
def register_agent_state_opts_helper(conf):
conf.register_opts(AGENT_STATE_OPTS, 'AGENT')
def register_interface_driver_opts_helper(conf):
conf.register_opts(INTERFACE_DRIVER_OPTS)
def register_use_namespaces_opts_helper(conf):
conf.register_opts(USE_NAMESPACES_OPTS)
def get_root_helper(conf):
root_helper = conf.AGENT.root_helper
if root_helper != 'sudo':
return root_helper
root_helper = conf.root_helper
if root_helper != 'sudo':
LOG.deprecated(_('DEFAULT.root_helper is deprecated! Please move '
'root_helper configuration to [AGENT] section.'))
return root_helper
return 'sudo'
def setup_conf():
bind_opts = [
cfg.StrOpt('state_path',
default='/var/lib/tacker',
help=_('Top-level directory for maintaining dhcp state')),
]
conf = cfg.ConfigOpts()
conf.register_opts(bind_opts)
return conf
# add a logging setup method here for convenience
setup_logging = config.setup_logging

View File

@ -1,221 +0,0 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import eventlet.event
import eventlet.queue
from tacker.agent.linux import utils
from tacker.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class AsyncProcessException(Exception):
pass
class AsyncProcess(object):
"""Manages an asynchronous process.
This class spawns a new process via subprocess and uses
greenthreads to read stderr and stdout asynchronously into queues
that can be read via repeatedly calling iter_stdout() and
iter_stderr().
If respawn_interval is non-zero, any error in communicating with
the managed process will result in the process and greenthreads
being cleaned up and the process restarted after the specified
interval.
Example usage:
>>> import time
>>> proc = AsyncProcess(['ping'])
>>> proc.start()
>>> time.sleep(5)
>>> proc.stop()
>>> for line in proc.iter_stdout():
... print line
"""
def __init__(self, cmd, root_helper=None, respawn_interval=None):
"""Constructor.
:param cmd: The list of command arguments to invoke.
:param root_helper: Optional, utility to use when running shell cmds.
:param respawn_interval: Optional, the interval in seconds to wait
to respawn after unexpected process death. Respawn will
only be attempted if a value of 0 or greater is provided.
"""
self.cmd = cmd
self.root_helper = root_helper
if respawn_interval is not None and respawn_interval < 0:
raise ValueError(_('respawn_interval must be >= 0 if provided.'))
self.respawn_interval = respawn_interval
self._process = None
self._kill_event = None
self._reset_queues()
self._watchers = []
def _reset_queues(self):
self._stdout_lines = eventlet.queue.LightQueue()
self._stderr_lines = eventlet.queue.LightQueue()
def start(self):
"""Launch a process and monitor it asynchronously."""
if self._kill_event:
raise AsyncProcessException(_('Process is already started'))
else:
LOG.debug(_('Launching async process [%s].'), self.cmd)
self._spawn()
def stop(self):
"""Halt the process and watcher threads."""
if self._kill_event:
LOG.debug(_('Halting async process [%s].'), self.cmd)
self._kill()
else:
raise AsyncProcessException(_('Process is not running.'))
def _spawn(self):
"""Spawn a process and its watchers."""
self._kill_event = eventlet.event.Event()
self._process, cmd = utils.create_process(self.cmd,
root_helper=self.root_helper)
self._watchers = []
for reader in (self._read_stdout, self._read_stderr):
# Pass the stop event directly to the greenthread to
# ensure that assignment of a new event to the instance
# attribute does not prevent the greenthread from using
# the original event.
watcher = eventlet.spawn(self._watch_process,
reader,
self._kill_event)
self._watchers.append(watcher)
def _kill(self, respawning=False):
"""Kill the process and the associated watcher greenthreads.
:param respawning: Optional, whether respawn will be subsequently
attempted.
"""
# Halt the greenthreads
self._kill_event.send()
pid = self._get_pid_to_kill()
if pid:
self._kill_process(pid)
if not respawning:
# Clear the kill event to ensure the process can be
# explicitly started again.
self._kill_event = None
def _get_pid_to_kill(self):
pid = self._process.pid
# If root helper was used, two or more processes will be created:
#
# - a root helper process (e.g. sudo myscript)
# - possibly a rootwrap script (e.g. tacker-rootwrap)
# - a child process (e.g. myscript)
#
# Killing the root helper process will leave the child process
# running, re-parented to init, so the only way to ensure that both
# die is to target the child process directly.
if self.root_helper:
try:
pid = utils.find_child_pids(pid)[0]
except IndexError:
# Process is already dead
return None
while True:
try:
# We shouldn't have more than one child per process
# so keep getting the children of the first one
pid = utils.find_child_pids(pid)[0]
except IndexError:
# Last process in the tree, return it
break
return pid
def _kill_process(self, pid):
try:
# A process started by a root helper will be running as
# root and need to be killed via the same helper.
utils.execute(['kill', '-9', pid], root_helper=self.root_helper)
except Exception as ex:
stale_pid = (isinstance(ex, RuntimeError) and
'No such process' in str(ex))
if not stale_pid:
LOG.exception(_('An error occurred while killing [%s].'),
self.cmd)
return False
return True
def _handle_process_error(self):
"""Kill the async process and respawn if necessary."""
LOG.debug(_('Halting async process [%s] in response to an error.'),
self.cmd)
respawning = self.respawn_interval >= 0
self._kill(respawning=respawning)
if respawning:
eventlet.sleep(self.respawn_interval)
LOG.debug(_('Respawning async process [%s].'), self.cmd)
self._spawn()
def _watch_process(self, callback, kill_event):
while not kill_event.ready():
try:
if not callback():
break
except Exception:
LOG.exception(_('An error occurred while communicating '
'with async process [%s].'), self.cmd)
break
# Ensure that watching a process with lots of output does
# not block execution of other greenthreads.
eventlet.sleep()
# The kill event not being ready indicates that the loop was
# broken out of due to an error in the watched process rather
# than the loop condition being satisfied.
if not kill_event.ready():
self._handle_process_error()
def _read(self, stream, queue):
data = stream.readline()
if data:
data = data.strip()
queue.put(data)
return data
def _read_stdout(self):
return self._read(self._process.stdout, self._stdout_lines)
def _read_stderr(self):
return self._read(self._process.stderr, self._stderr_lines)
def _iter_queue(self, queue):
while True:
try:
yield queue.get_nowait()
except eventlet.queue.Empty:
break
def iter_stdout(self):
return self._iter_queue(self._stdout_lines)
def iter_stderr(self):
return self._iter_queue(self._stderr_lines)

View File

@ -1,147 +0,0 @@
# Copyright 2012 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import atexit
import fcntl
import os
import signal
import sys
from tacker.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class Pidfile(object):
def __init__(self, pidfile, procname, uuid=None):
self.pidfile = pidfile
self.procname = procname
self.uuid = uuid
try:
self.fd = os.open(pidfile, os.O_CREAT | os.O_RDWR)
fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
LOG.exception(_("Error while handling pidfile: %s"), pidfile)
sys.exit(1)
def __str__(self):
return self.pidfile
def unlock(self):
if not not fcntl.flock(self.fd, fcntl.LOCK_UN):
raise IOError(_('Unable to unlock pid file'))
def write(self, pid):
os.ftruncate(self.fd, 0)
os.write(self.fd, "%d" % pid)
os.fsync(self.fd)
def read(self):
try:
pid = int(os.read(self.fd, 128))
os.lseek(self.fd, 0, os.SEEK_SET)
return pid
except ValueError:
return
def is_running(self):
pid = self.read()
if not pid:
return False
cmdline = '/proc/%s/cmdline' % pid
try:
with open(cmdline, "r") as f:
exec_out = f.readline()
return self.procname in exec_out and (not self.uuid or
self.uuid in exec_out)
except IOError:
return False
class Daemon(object):
"""A generic daemon class.
Usage: subclass the Daemon class and override the run() method
"""
def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null',
stderr='/dev/null', procname='python', uuid=None):
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.procname = procname
self.pidfile = Pidfile(pidfile, procname, uuid)
def _fork(self):
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError:
LOG.exception(_('Fork failed'))
sys.exit(1)
def daemonize(self):
"""Daemonize process by doing Stevens double fork."""
# fork first time
self._fork()
# decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(0)
# fork second time
self._fork()
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
stdin = open(self.stdin, 'r')
stdout = open(self.stdout, 'a+')
stderr = open(self.stderr, 'a+', 0)
os.dup2(stdin.fileno(), sys.stdin.fileno())
os.dup2(stdout.fileno(), sys.stdout.fileno())
os.dup2(stderr.fileno(), sys.stderr.fileno())
# write pidfile
atexit.register(self.delete_pid)
signal.signal(signal.SIGTERM, self.handle_sigterm)
self.pidfile.write(os.getpid())
def delete_pid(self):
os.remove(str(self.pidfile))
def handle_sigterm(self, signum, frame):
sys.exit(0)
def start(self):
"""Start the daemon."""
if self.pidfile.is_running():
self.pidfile.unlock()
message = _('Pidfile %s already exist. Daemon already running?')
LOG.error(message, self.pidfile)
sys.exit(1)
# Start the daemon
self.daemonize()
self.run()
def run(self):
"""Override this method when subclassing Daemon.
start() will call this method after the process has daemonized.
"""
pass

View File

@ -1,100 +0,0 @@
# Copyright 2012 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslo_config import cfg
from tacker.agent.linux import ip_lib
from tacker.agent.linux import utils
from tacker.openstack.common import log as logging
LOG = logging.getLogger(__name__)
OPTS = [
cfg.StrOpt('external_pids',
default='$state_path/external/pids',
help=_('Location to store child pid files')),
]
cfg.CONF.register_opts(OPTS)
class ProcessManager(object):
"""An external process manager for Tacker spawned processes.
Note: The manager expects uuid to be in cmdline.
"""
def __init__(self, conf, uuid, root_helper='sudo', namespace=None):
self.conf = conf
self.uuid = uuid
self.root_helper = root_helper
self.namespace = namespace
def enable(self, cmd_callback):
if not self.active:
cmd = cmd_callback(self.get_pid_file_name(ensure_pids_dir=True))
ip_wrapper = ip_lib.IPWrapper(self.root_helper, self.namespace)
ip_wrapper.netns.execute(cmd)
def disable(self):
pid = self.pid
if self.active:
cmd = ['kill', '-9', pid]
utils.execute(cmd, self.root_helper)
elif pid:
LOG.debug(_('Process for %(uuid)s pid %(pid)d is stale, ignoring '
'command'), {'uuid': self.uuid, 'pid': pid})
else:
LOG.debug(_('No process started for %s'), self.uuid)
def get_pid_file_name(self, ensure_pids_dir=False):
"""Returns the file name for a given kind of config file."""
pids_dir = os.path.abspath(os.path.normpath(self.conf.external_pids))
if ensure_pids_dir and not os.path.isdir(pids_dir):
os.makedirs(pids_dir, 0o755)
return os.path.join(pids_dir, self.uuid + '.pid')
@property
def pid(self):
"""Last known pid for this external process spawned for this uuid."""
file_name = self.get_pid_file_name()
msg = _('Error while reading %s')
try:
with open(file_name, 'r') as f:
return int(f.read())
except IOError:
msg = _('Unable to access %s')
except ValueError:
msg = _('Unable to convert value in %s')
LOG.debug(msg, file_name)
return None
@property
def active(self):
pid = self.pid
if pid is None:
return False
cmdline = '/proc/%s/cmdline' % pid
try:
with open(cmdline, "r") as f:
return self.uuid in f.readline()
except IOError:
return False

View File

@ -1,454 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import netaddr
from oslo_config import cfg
import six
from tacker.agent.common import config
from tacker.agent.linux import ip_lib
from tacker.agent.linux import ovs_lib
from tacker.agent.linux import utils
from tacker.common import exceptions
from tacker.extensions import flavor
from tacker.openstack.common import importutils
from tacker.openstack.common import log as logging
LOG = logging.getLogger(__name__)
OPTS = [
cfg.StrOpt('ovs_integration_bridge',
default='br-int',
help=_('Name of Open vSwitch bridge to use')),
cfg.BoolOpt('ovs_use_veth',
default=False,
help=_('Uses veth for an interface or not')),
cfg.IntOpt('network_device_mtu',
help=_('MTU setting for device.')),
cfg.StrOpt('meta_flavor_driver_mappings',
help=_('Mapping between flavor and LinuxInterfaceDriver')),
cfg.StrOpt('admin_user',
help=_("Admin username")),
cfg.StrOpt('admin_password',
help=_("Admin password"),
secret=True),
cfg.StrOpt('admin_tenant_name',
help=_("Admin tenant name")),
cfg.StrOpt('auth_url',
help=_("Authentication URL")),
cfg.StrOpt('auth_strategy', default='keystone',
help=_("The type of authentication to use")),
cfg.StrOpt('auth_region',
help=_("Authentication region")),
]
@six.add_metaclass(abc.ABCMeta)
class LinuxInterfaceDriver(object):
# from linux IF_NAMESIZE
DEV_NAME_LEN = 14
DEV_NAME_PREFIX = 'tap'
def __init__(self, conf):
self.conf = conf
self.root_helper = config.get_root_helper(conf)
def init_l3(self, device_name, ip_cidrs, namespace=None,
preserve_ips=None, gateway=None, extra_subnets=None):
"""Set the L3 settings for the interface using data from the port.
ip_cidrs: list of 'X.X.X.X/YY' strings
preserve_ips: list of ip cidrs that should not be removed from device
"""
if not preserve_ips:
preserve_ips = []
if not extra_subnets:
extra_subnets = []
device = ip_lib.IPDevice(device_name,
self.root_helper,
namespace=namespace)
previous = {}
for address in device.addr.list(scope='global', filters=['permanent']):
previous[address['cidr']] = address['ip_version']
# add new addresses
for ip_cidr in ip_cidrs:
net = netaddr.IPNetwork(ip_cidr)
# Convert to compact IPv6 address because the return values of
# "ip addr list" are compact.
if net.version == 6:
ip_cidr = str(net)
if ip_cidr in previous:
del previous[ip_cidr]
continue
device.addr.add(net.version, ip_cidr, str(net.broadcast))
# clean up any old addresses
for ip_cidr, ip_version in previous.items():
if ip_cidr not in preserve_ips:
device.addr.delete(ip_version, ip_cidr)
if gateway:
device.route.add_gateway(gateway)
new_onlink_routes = set(s['cidr'] for s in extra_subnets)
existing_onlink_routes = set(device.route.list_onlink_routes())
for route in new_onlink_routes - existing_onlink_routes:
device.route.add_onlink_route(route)
for route in existing_onlink_routes - new_onlink_routes:
device.route.delete_onlink_route(route)
def check_bridge_exists(self, bridge):
if not ip_lib.device_exists(bridge):
raise exceptions.BridgeDoesNotExist(bridge=bridge)
def get_device_name(self, port):
return (self.DEV_NAME_PREFIX + port.id)[:self.DEV_NAME_LEN]
@abc.abstractmethod
def plug(self, network_id, port_id, device_name, mac_address,
bridge=None, namespace=None, prefix=None):
"""Plug in the interface."""
@abc.abstractmethod
def unplug(self, device_name, bridge=None, namespace=None, prefix=None):
"""Unplug the interface."""
class NullDriver(LinuxInterfaceDriver):
def plug(self, network_id, port_id, device_name, mac_address,
bridge=None, namespace=None, prefix=None):
pass
def unplug(self, device_name, bridge=None, namespace=None, prefix=None):
pass
class OVSInterfaceDriver(LinuxInterfaceDriver):
"""Driver for creating an internal interface on an OVS bridge."""
DEV_NAME_PREFIX = 'tap'
def __init__(self, conf):
super(OVSInterfaceDriver, self).__init__(conf)
if self.conf.ovs_use_veth:
self.DEV_NAME_PREFIX = 'ns-'
def _get_tap_name(self, dev_name, prefix=None):
if self.conf.ovs_use_veth:
dev_name = dev_name.replace(prefix or self.DEV_NAME_PREFIX, 'tap')
return dev_name
def _ovs_add_port(self, bridge, device_name, port_id, mac_address,
internal=True):
cmd = ['ovs-vsctl', '--', '--if-exists', 'del-port', device_name, '--',
'add-port', bridge, device_name]
if internal:
cmd += ['--', 'set', 'Interface', device_name, 'type=internal']
cmd += ['--', 'set', 'Interface', device_name,
'external-ids:iface-id=%s' % port_id,
'--', 'set', 'Interface', device_name,
'external-ids:iface-status=active',
'--', 'set', 'Interface', device_name,
'external-ids:attached-mac=%s' % mac_address]
utils.execute(cmd, self.root_helper)
def plug(self, network_id, port_id, device_name, mac_address,
bridge=None, namespace=None, prefix=None):
"""Plug in the interface."""
if not bridge:
bridge = self.conf.ovs_integration_bridge
if not ip_lib.device_exists(device_name,
self.root_helper,
namespace=namespace):
self.check_bridge_exists(bridge)
ip = ip_lib.IPWrapper(self.root_helper)
tap_name = self._get_tap_name(device_name, prefix)
if self.conf.ovs_use_veth:
# Create ns_dev in a namespace if one is configured.
root_dev, ns_dev = ip.add_veth(tap_name,
device_name,
namespace2=namespace)
else:
ns_dev = ip.device(device_name)
internal = not self.conf.ovs_use_veth
self._ovs_add_port(bridge, tap_name, port_id, mac_address,
internal=internal)
ns_dev.link.set_address(mac_address)
if self.conf.network_device_mtu:
ns_dev.link.set_mtu(self.conf.network_device_mtu)
if self.conf.ovs_use_veth:
root_dev.link.set_mtu(self.conf.network_device_mtu)
# Add an interface created by ovs to the namespace.
if not self.conf.ovs_use_veth and namespace:
namespace_obj = ip.ensure_namespace(namespace)
namespace_obj.add_device_to_namespace(ns_dev)
ns_dev.link.set_up()
if self.conf.ovs_use_veth:
root_dev.link.set_up()
else:
LOG.info(_("Device %s already exists"), device_name)
def unplug(self, device_name, bridge=None, namespace=None, prefix=None):
"""Unplug the interface."""
if not bridge:
bridge = self.conf.ovs_integration_bridge
tap_name = self._get_tap_name(device_name, prefix)
self.check_bridge_exists(bridge)
ovs = ovs_lib.OVSBridge(bridge, self.root_helper)
try:
ovs.delete_port(tap_name)
if self.conf.ovs_use_veth:
device = ip_lib.IPDevice(device_name,
self.root_helper,
namespace)
device.link.delete()
LOG.debug(_("Unplugged interface '%s'"), device_name)
except RuntimeError:
LOG.error(_("Failed unplugging interface '%s'"),
device_name)
class MidonetInterfaceDriver(LinuxInterfaceDriver):
def plug(self, network_id, port_id, device_name, mac_address,
bridge=None, namespace=None, prefix=None):
"""Driver for creating an interface.
This method is called by the Dhcp agent or by the L3 agent
when a new network is created
"""
if not ip_lib.device_exists(device_name,
self.root_helper,
namespace=namespace):
ip = ip_lib.IPWrapper(self.root_helper)
tap_name = device_name.replace(prefix or 'tap', 'tap')
# Create ns_dev in a namespace if one is configured.
root_dev, ns_dev = ip.add_veth(tap_name, device_name,
namespace2=namespace)
ns_dev.link.set_address(mac_address)
# Add an interface created by ovs to the namespace.
namespace_obj = ip.ensure_namespace(namespace)
namespace_obj.add_device_to_namespace(ns_dev)
ns_dev.link.set_up()
root_dev.link.set_up()
cmd = ['mm-ctl', '--bind-port', port_id, device_name]
utils.execute(cmd, self.root_helper)
else:
LOG.info(_("Device %s already exists"), device_name)
def unplug(self, device_name, bridge=None, namespace=None, prefix=None):
# the port will be deleted by the dhcp agent that will call the plugin
device = ip_lib.IPDevice(device_name,
self.root_helper,
namespace)
try:
device.link.delete()
except RuntimeError:
LOG.error(_("Failed unplugging interface '%s'"), device_name)
LOG.debug(_("Unplugged interface '%s'"), device_name)
ip_lib.IPWrapper(
self.root_helper, namespace).garbage_collect_namespace()
class IVSInterfaceDriver(LinuxInterfaceDriver):
"""Driver for creating an internal interface on an IVS bridge."""
DEV_NAME_PREFIX = 'tap'
def __init__(self, conf):
super(IVSInterfaceDriver, self).__init__(conf)
self.DEV_NAME_PREFIX = 'ns-'
def _get_tap_name(self, dev_name, prefix=None):
dev_name = dev_name.replace(prefix or self.DEV_NAME_PREFIX, 'tap')
return dev_name
def _ivs_add_port(self, device_name, port_id, mac_address):
cmd = ['ivs-ctl', 'add-port', device_name]
utils.execute(cmd, self.root_helper)
def plug(self, network_id, port_id, device_name, mac_address,
bridge=None, namespace=None, prefix=None):
"""Plug in the interface."""
if not ip_lib.device_exists(device_name,
self.root_helper,
namespace=namespace):
ip = ip_lib.IPWrapper(self.root_helper)
tap_name = self._get_tap_name(device_name, prefix)
root_dev, ns_dev = ip.add_veth(tap_name, device_name)
self._ivs_add_port(tap_name, port_id, mac_address)
ns_dev = ip.device(device_name)
ns_dev.link.set_address(mac_address)
if self.conf.network_device_mtu:
ns_dev.link.set_mtu(self.conf.network_device_mtu)
root_dev.link.set_mtu(self.conf.network_device_mtu)
if namespace:
namespace_obj = ip.ensure_namespace(namespace)
namespace_obj.add_device_to_namespace(ns_dev)
ns_dev.link.set_up()
root_dev.link.set_up()
else:
LOG.info(_("Device %s already exists"), device_name)
def unplug(self, device_name, bridge=None, namespace=None, prefix=None):
"""Unplug the interface."""
tap_name = self._get_tap_name(device_name, prefix)
try:
cmd = ['ivs-ctl', 'del-port', tap_name]
utils.execute(cmd, self.root_helper)
device = ip_lib.IPDevice(device_name,
self.root_helper,
namespace)
device.link.delete()
LOG.debug(_("Unplugged interface '%s'"), device_name)
except RuntimeError:
LOG.error(_("Failed unplugging interface '%s'"),
device_name)
class BridgeInterfaceDriver(LinuxInterfaceDriver):
"""Driver for creating bridge interfaces."""
DEV_NAME_PREFIX = 'ns-'
def plug(self, network_id, port_id, device_name, mac_address,
bridge=None, namespace=None, prefix=None):
"""Plugin the interface."""
if not ip_lib.device_exists(device_name,
self.root_helper,
namespace=namespace):
ip = ip_lib.IPWrapper(self.root_helper)
# Enable agent to define the prefix
if prefix:
tap_name = device_name.replace(prefix, 'tap')
else:
tap_name = device_name.replace(self.DEV_NAME_PREFIX, 'tap')
# Create ns_veth in a namespace if one is configured.
root_veth, ns_veth = ip.add_veth(tap_name, device_name,
namespace2=namespace)
ns_veth.link.set_address(mac_address)
if self.conf.network_device_mtu:
root_veth.link.set_mtu(self.conf.network_device_mtu)
ns_veth.link.set_mtu(self.conf.network_device_mtu)
root_veth.link.set_up()
ns_veth.link.set_up()
else:
LOG.info(_("Device %s already exists"), device_name)
def unplug(self, device_name, bridge=None, namespace=None, prefix=None):
"""Unplug the interface."""
device = ip_lib.IPDevice(device_name, self.root_helper, namespace)
try:
device.link.delete()
LOG.debug(_("Unplugged interface '%s'"), device_name)
except RuntimeError:
LOG.error(_("Failed unplugging interface '%s'"),
device_name)
class MetaInterfaceDriver(LinuxInterfaceDriver):
def __init__(self, conf):
super(MetaInterfaceDriver, self).__init__(conf)
from tackerclient.v2_0 import client
self.tacker = client.Client(
username=self.conf.admin_user,
password=self.conf.admin_password,
tenant_name=self.conf.admin_tenant_name,
auth_url=self.conf.auth_url,
auth_strategy=self.conf.auth_strategy,
region_name=self.conf.auth_region
)
self.flavor_driver_map = {}
for net_flavor, driver_name in [
driver_set.split(':')
for driver_set in
self.conf.meta_flavor_driver_mappings.split(',')]:
self.flavor_driver_map[net_flavor] = self._load_driver(driver_name)
def _get_flavor_by_network_id(self, network_id):
network = self.tacker.show_network(network_id)
return network['network'][flavor.FLAVOR_NETWORK]
def _get_driver_by_network_id(self, network_id):
net_flavor = self._get_flavor_by_network_id(network_id)
return self.flavor_driver_map[net_flavor]
def _set_device_plugin_tag(self, network_id, device_name, namespace=None):
plugin_tag = self._get_flavor_by_network_id(network_id)
device = ip_lib.IPDevice(device_name, self.conf.root_helper, namespace)
device.link.set_alias(plugin_tag)
def _get_device_plugin_tag(self, device_name, namespace=None):
device = ip_lib.IPDevice(device_name, self.conf.root_helper, namespace)
return device.link.alias
def get_device_name(self, port):
driver = self._get_driver_by_network_id(port.network_id)
return driver.get_device_name(port)
def plug(self, network_id, port_id, device_name, mac_address,
bridge=None, namespace=None, prefix=None):
driver = self._get_driver_by_network_id(network_id)
ret = driver.plug(network_id, port_id, device_name, mac_address,
bridge=bridge, namespace=namespace, prefix=prefix)
self._set_device_plugin_tag(network_id, device_name, namespace)
return ret
def unplug(self, device_name, bridge=None, namespace=None, prefix=None):
plugin_tag = self._get_device_plugin_tag(device_name, namespace)
driver = self.flavor_driver_map[plugin_tag]
return driver.unplug(device_name, bridge, namespace, prefix)
def _load_driver(self, driver_provider):
LOG.debug(_("Driver location: %s"), driver_provider)
plugin_klass = importutils.import_class(driver_provider)
return plugin_klass(self.conf)

View File

@ -1,570 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from oslo_config import cfg
from tacker.agent.linux import utils
from tacker.common import exceptions
OPTS = [
cfg.BoolOpt('ip_lib_force_root',
default=False,
help=_('Force ip_lib calls to use the root helper')),
]
VETH_MAX_NAME_LENGTH = 15
LOOPBACK_DEVNAME = 'lo'
# NOTE(ethuleau): depend of the version of iproute2, the vlan
# interface details vary.
VLAN_INTERFACE_DETAIL = ['vlan protocol 802.1q',
'vlan protocol 802.1Q',
'vlan id']
class SubProcessBase(object):
def __init__(self, root_helper=None, namespace=None):
self.root_helper = root_helper
self.namespace = namespace
try:
self.force_root = cfg.CONF.ip_lib_force_root
except cfg.NoSuchOptError:
# Only callers that need to force use of the root helper
# need to register the option.
self.force_root = False
def _run(self, options, command, args):
if self.namespace:
return self._as_root(options, command, args)
elif self.force_root:
# Force use of the root helper to ensure that commands
# will execute in dom0 when running under XenServer/XCP.
return self._execute(options, command, args, self.root_helper)
else:
return self._execute(options, command, args)
def _as_root(self, options, command, args, use_root_namespace=False):
if not self.root_helper:
raise exceptions.SudoRequired()
namespace = self.namespace if not use_root_namespace else None
return self._execute(options,
command,
args,
self.root_helper,
namespace)
@classmethod
def _execute(cls, options, command, args, root_helper=None,
namespace=None):
opt_list = ['-%s' % o for o in options]
if namespace:
ip_cmd = ['ip', 'netns', 'exec', namespace, 'ip']
else:
ip_cmd = ['ip']
return utils.execute(ip_cmd + opt_list + [command] + list(args),
root_helper=root_helper)
class IPWrapper(SubProcessBase):
def __init__(self, root_helper=None, namespace=None):
super(IPWrapper, self).__init__(root_helper=root_helper,
namespace=namespace)
self.netns = IpNetnsCommand(self)
def device(self, name):
return IPDevice(name, self.root_helper, self.namespace)
def get_devices(self, exclude_loopback=False):
retval = []
output = self._execute(['o', 'd'], 'link', ('list',),
self.root_helper, self.namespace)
for line in output.split('\n'):
if '<' not in line:
continue
tokens = line.split(' ', 2)
if len(tokens) == 3:
if any(v in tokens[2] for v in VLAN_INTERFACE_DETAIL):
delimiter = '@'
else:
delimiter = ':'
name = tokens[1].rpartition(delimiter)[0].strip()
if exclude_loopback and name == LOOPBACK_DEVNAME:
continue
retval.append(IPDevice(name,
self.root_helper,
self.namespace))
return retval
def add_tuntap(self, name, mode='tap'):
self._as_root('', 'tuntap', ('add', name, 'mode', mode))
return IPDevice(name, self.root_helper, self.namespace)
def add_veth(self, name1, name2, namespace2=None):
args = ['add', name1, 'type', 'veth', 'peer', 'name', name2]
if namespace2 is None:
namespace2 = self.namespace
else:
self.ensure_namespace(namespace2)
args += ['netns', namespace2]
self._as_root('', 'link', tuple(args))
return (IPDevice(name1, self.root_helper, self.namespace),
IPDevice(name2, self.root_helper, namespace2))
def ensure_namespace(self, name):
if not self.netns.exists(name):
ip = self.netns.add(name)
lo = ip.device(LOOPBACK_DEVNAME)
lo.link.set_up()
else:
ip = IPWrapper(self.root_helper, name)
return ip
def namespace_is_empty(self):
return not self.get_devices(exclude_loopback=True)
def garbage_collect_namespace(self):
"""Conditionally destroy the namespace if it is empty."""
if self.namespace and self.netns.exists(self.namespace):
if self.namespace_is_empty():
self.netns.delete(self.namespace)
return True
return False
def add_device_to_namespace(self, device):
if self.namespace:
device.link.set_netns(self.namespace)
def add_vxlan(self, name, vni, group=None, dev=None, ttl=None, tos=None,
local=None, port=None, proxy=False):
cmd = ['add', name, 'type', 'vxlan', 'id', vni]
if group:
cmd.extend(['group', group])
if dev:
cmd.extend(['dev', dev])
if ttl:
cmd.extend(['ttl', ttl])
if tos:
cmd.extend(['tos', tos])
if local:
cmd.extend(['local', local])
if proxy:
cmd.append('proxy')
# tuple: min,max
if port and len(port) == 2:
cmd.extend(['port', port[0], port[1]])
elif port:
raise exceptions.NetworkVxlanPortRangeError(vxlan_range=port)
self._as_root('', 'link', cmd)
return (IPDevice(name, self.root_helper, self.namespace))
@classmethod
def get_namespaces(cls, root_helper):
output = cls._execute('', 'netns', ('list',), root_helper=root_helper)
return [l.strip() for l in output.split('\n')]
class IpRule(IPWrapper):
def add_rule_from(self, ip, table, rule_pr):
args = ['add', 'from', ip, 'lookup', table, 'priority', rule_pr]
ip = self._as_root('', 'rule', tuple(args))
return ip
def delete_rule_priority(self, rule_pr):
args = ['del', 'priority', rule_pr]
ip = self._as_root('', 'rule', tuple(args))
return ip
class IPDevice(SubProcessBase):
def __init__(self, name, root_helper=None, namespace=None):
super(IPDevice, self).__init__(root_helper=root_helper,
namespace=namespace)
self.name = name
self.link = IpLinkCommand(self)
self.addr = IpAddrCommand(self)
self.route = IpRouteCommand(self)
self.neigh = IpNeighCommand(self)
def __eq__(self, other):
return (other is not None and self.name == other.name
and self.namespace == other.namespace)
def __str__(self):
return self.name
class IpCommandBase(object):
COMMAND = ''
def __init__(self, parent):
self._parent = parent
def _run(self, *args, **kwargs):
return self._parent._run(kwargs.get('options', []), self.COMMAND, args)
def _as_root(self, *args, **kwargs):
return self._parent._as_root(kwargs.get('options', []),
self.COMMAND,
args,
kwargs.get('use_root_namespace', False))
class IpDeviceCommandBase(IpCommandBase):
@property
def name(self):
return self._parent.name
class IpLinkCommand(IpDeviceCommandBase):
COMMAND = 'link'
def set_address(self, mac_address):
self._as_root('set', self.name, 'address', mac_address)
def set_mtu(self, mtu_size):
self._as_root('set', self.name, 'mtu', mtu_size)
def set_up(self):
self._as_root('set', self.name, 'up')
def set_down(self):
self._as_root('set', self.name, 'down')
def set_netns(self, namespace):
self._as_root('set', self.name, 'netns', namespace)
self._parent.namespace = namespace
def set_name(self, name):
self._as_root('set', self.name, 'name', name)
self._parent.name = name
def set_alias(self, alias_name):
self._as_root('set', self.name, 'alias', alias_name)
def delete(self):
self._as_root('delete', self.name)
@property
def address(self):
return self.attributes.get('link/ether')
@property
def state(self):
return self.attributes.get('state')
@property
def mtu(self):
return self.attributes.get('mtu')
@property
def qdisc(self):
return self.attributes.get('qdisc')
@property
def qlen(self):
return self.attributes.get('qlen')
@property
def alias(self):
return self.attributes.get('alias')
@property
def attributes(self):
return self._parse_line(self._run('show', self.name, options='o'))
def _parse_line(self, value):
if not value:
return {}
device_name, settings = value.replace("\\", '').split('>', 1)
tokens = settings.split()
keys = tokens[::2]
values = [int(v) if v.isdigit() else v for v in tokens[1::2]]
retval = dict(zip(keys, values))
return retval
class IpAddrCommand(IpDeviceCommandBase):
COMMAND = 'addr'
def add(self, ip_version, cidr, broadcast, scope='global'):
self._as_root('add',
cidr,
'brd',
broadcast,
'scope',
scope,
'dev',
self.name,
options=[ip_version])
def delete(self, ip_version, cidr):
self._as_root('del',
cidr,
'dev',
self.name,
options=[ip_version])
def flush(self):
self._as_root('flush', self.name)
def list(self, scope=None, to=None, filters=None):
if filters is None:
filters = []
retval = []
if scope:
filters += ['scope', scope]
if to:
filters += ['to', to]
for line in self._run('show', self.name, *filters).split('\n'):
line = line.strip()
if not line.startswith('inet'):
continue
parts = line.split()
if parts[0] == 'inet6':
version = 6
scope = parts[3]
broadcast = '::'
else:
version = 4
if parts[2] == 'brd':
broadcast = parts[3]
scope = parts[5]
else:
# sometimes output of 'ip a' might look like:
# inet 192.168.100.100/24 scope global eth0
# and broadcast needs to be calculated from CIDR
broadcast = str(netaddr.IPNetwork(parts[1]).broadcast)
scope = parts[3]
retval.append(dict(cidr=parts[1],
broadcast=broadcast,
scope=scope,
ip_version=version,
dynamic=('dynamic' == parts[-1])))
return retval
class IpRouteCommand(IpDeviceCommandBase):
COMMAND = 'route'
def add_gateway(self, gateway, metric=None, table=None):
args = ['replace', 'default', 'via', gateway]
if metric:
args += ['metric', metric]
args += ['dev', self.name]
if table:
args += ['table', table]
self._as_root(*args)
def delete_gateway(self, gateway=None, table=None):
args = ['del', 'default']
if gateway:
args += ['via', gateway]
args += ['dev', self.name]
if table:
args += ['table', table]
self._as_root(*args)
def list_onlink_routes(self):
def iterate_routes():
output = self._run('list', 'dev', self.name, 'scope', 'link')
for line in output.split('\n'):
line = line.strip()
if line and not line.count('src'):
yield line
return [x for x in iterate_routes()]
def add_onlink_route(self, cidr):
self._as_root('replace', cidr, 'dev', self.name, 'scope', 'link')
def delete_onlink_route(self, cidr):
self._as_root('del', cidr, 'dev', self.name, 'scope', 'link')
def get_gateway(self, scope=None, filters=None):
if filters is None:
filters = []
retval = None
if scope:
filters += ['scope', scope]
route_list_lines = self._run('list', 'dev', self.name,
*filters).split('\n')
default_route_line = next((x.strip() for x in
route_list_lines if
x.strip().startswith('default')), None)
if default_route_line:
gateway_index = 2
parts = default_route_line.split()
retval = dict(gateway=parts[gateway_index])
if 'metric' in parts:
metric_index = parts.index('metric') + 1
retval.update(metric=int(parts[metric_index]))
return retval
def pullup_route(self, interface_name):
"""Route entry pullup.
Ensures that the route entry for the interface is before all
others on the same subnet.
"""
device_list = []
device_route_list_lines = self._run('list', 'proto', 'kernel',
'dev', interface_name).split('\n')
for device_route_line in device_route_list_lines:
try:
subnet = device_route_line.split()[0]
except Exception:
continue
subnet_route_list_lines = self._run('list', 'proto', 'kernel',
'match', subnet).split('\n')
for subnet_route_line in subnet_route_list_lines:
i = iter(subnet_route_line.split())
while(next(i) != 'dev'):
pass
device = next(i)
try:
while(next(i) != 'src'):
pass
src = next(i)
except Exception:
src = ''
if device != interface_name:
device_list.append((device, src))
else:
break
for (device, src) in device_list:
self._as_root('del', subnet, 'dev', device)
if (src != ''):
self._as_root('append', subnet, 'proto', 'kernel',
'src', src, 'dev', device)
else:
self._as_root('append', subnet, 'proto', 'kernel',
'dev', device)
def add_route(self, cidr, ip, table=None):
args = ['replace', cidr, 'via', ip, 'dev', self.name]
if table:
args += ['table', table]
self._as_root(*args)
def delete_route(self, cidr, ip, table=None):
args = ['del', cidr, 'via', ip, 'dev', self.name]
if table:
args += ['table', table]
self._as_root(*args)
class IpNeighCommand(IpDeviceCommandBase):
COMMAND = 'neigh'
def add(self, ip_version, ip_address, mac_address):
self._as_root('replace',
ip_address,
'lladdr',
mac_address,
'nud',
'permanent',
'dev',
self.name,
options=[ip_version])
def delete(self, ip_version, ip_address, mac_address):
self._as_root('del',
ip_address,
'lladdr',
mac_address,
'dev',
self.name,
options=[ip_version])
class IpNetnsCommand(IpCommandBase):
COMMAND = 'netns'
def add(self, name):
self._as_root('add', name, use_root_namespace=True)
return IPWrapper(self._parent.root_helper, name)
def delete(self, name):
self._as_root('delete', name, use_root_namespace=True)
def execute(self, cmds, addl_env=None, check_exit_code=True):
if not self._parent.root_helper:
raise exceptions.SudoRequired()
ns_params = []
if self._parent.namespace:
ns_params = ['ip', 'netns', 'exec', self._parent.namespace]
env_params = []
if addl_env:
env_params = (['env'] +
['%s=%s' % pair for pair in addl_env.items()])
return utils.execute(
ns_params + env_params + list(cmds),
root_helper=self._parent.root_helper,
check_exit_code=check_exit_code)
def exists(self, name):
output = self._parent._execute('o', 'netns', ['list'])
for line in output.split('\n'):
if name == line.strip():
return True
return False
def device_exists(device_name, root_helper=None, namespace=None):
try:
address = IPDevice(device_name, root_helper, namespace).link.address
except RuntimeError:
return False
return bool(address)
def ensure_device_is_ready(device_name, root_helper=None, namespace=None):
dev = IPDevice(device_name, root_helper, namespace)
try:
# Ensure the device is up, even if it is already up. If the device
# doesn't exist, a RuntimeError will be raised.
dev.link.set_up()
except RuntimeError:
return False
return True
def iproute_arg_supported(command, arg, root_helper=None):
command += ['help']
stdout, stderr = utils.execute(command, root_helper=root_helper,
check_exit_code=False, return_stderr=True)
return any(arg in line for line in stderr.split('\n'))

View File

@ -1,115 +0,0 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import eventlet
from tacker.agent.linux import ovsdb_monitor
from tacker.plugins.openvswitch.common import constants
@contextlib.contextmanager
def get_polling_manager(minimize_polling=False,
root_helper=None,
ovsdb_monitor_respawn_interval=(
constants.DEFAULT_OVSDBMON_RESPAWN)):
if minimize_polling:
pm = InterfacePollingMinimizer(
root_helper=root_helper,
ovsdb_monitor_respawn_interval=ovsdb_monitor_respawn_interval)
pm.start()
else:
pm = AlwaysPoll()
try:
yield pm
finally:
if minimize_polling:
pm.stop()
class BasePollingManager(object):
def __init__(self):
self._force_polling = False
self._polling_completed = True
def force_polling(self):
self._force_polling = True
def polling_completed(self):
self._polling_completed = True
def _is_polling_required(self):
raise NotImplementedError
@property
def is_polling_required(self):
# Always consume the updates to minimize polling.
polling_required = self._is_polling_required()
# Polling is required regardless of whether updates have been
# detected.
if self._force_polling:
self._force_polling = False
polling_required = True
# Polling is required if not yet done for previously detected
# updates.
if not self._polling_completed:
polling_required = True
if polling_required:
# Track whether polling has been completed to ensure that
# polling can be required until the caller indicates via a
# call to polling_completed() that polling has been
# successfully performed.
self._polling_completed = False
return polling_required
class AlwaysPoll(BasePollingManager):
def _is_polling_required(self):
return True
@property
def is_polling_required(self):
return self._is_polling_required()
class InterfacePollingMinimizer(BasePollingManager):
"""Monitors ovsdb to determine when polling is required."""
def __init__(self, root_helper=None,
ovsdb_monitor_respawn_interval=(
constants.DEFAULT_OVSDBMON_RESPAWN)):
super(InterfacePollingMinimizer, self).__init__()
self._monitor = ovsdb_monitor.SimpleInterfaceMonitor(
root_helper=root_helper,
respawn_interval=ovsdb_monitor_respawn_interval)
def start(self):
self._monitor.start()
def stop(self):
self._monitor.stop()
def _is_polling_required(self):
# Maximize the chances of update detection having a chance to
# collect output.
eventlet.sleep()
return self._monitor.has_updates

View File

@ -1,109 +0,0 @@
# Copyright (c) 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from tacker.common import rpc_compat
from tacker.common import topics
from tacker.openstack.common import log as logging
from tacker.openstack.common import timeutils
LOG = logging.getLogger(__name__)
def create_consumers(endpoints, prefix, topic_details):
"""Create agent RPC consumers.
:param endpoints: The list of endpoints to process the incoming messages.
:param prefix: Common prefix for the plugin/agent message queues.
:param topic_details: A list of topics. Each topic has a name, an
operation, and an optional host param keying the
subscription to topic.host for plugin calls.
:returns: A common Connection.
"""
connection = rpc_compat.create_connection(new=True)
for details in topic_details:
topic, operation, node_name = itertools.islice(
itertools.chain(details, [None]), 3)
topic_name = topics.get_topic_name(prefix, topic, operation)
connection.create_consumer(topic_name, endpoints, fanout=True)
if node_name:
node_topic_name = '%s.%s' % (topic_name, node_name)
connection.create_consumer(node_topic_name,
endpoints,
fanout=False)
connection.consume_in_threads()
return connection
class PluginReportStateAPI(rpc_compat.RpcProxy):
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic):
super(PluginReportStateAPI, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def report_state(self, context, agent_state, use_call=False):
msg = self.make_msg('report_state',
agent_state={'agent_state':
agent_state},
time=timeutils.strtime())
if use_call:
return self.call(context, msg, topic=self.topic)
else:
return self.cast(context, msg, topic=self.topic)
class PluginApi(rpc_compat.RpcProxy):
'''Agent side of the rpc API.
API version history:
1.0 - Initial version.
'''
BASE_RPC_API_VERSION = '1.1'
def __init__(self, topic):
super(PluginApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def get_device_details(self, context, device, agent_id):
return self.call(context,
self.make_msg('get_device_details', device=device,
agent_id=agent_id),
topic=self.topic)
def update_device_down(self, context, device, agent_id, host=None):
return self.call(context,
self.make_msg('update_device_down', device=device,
agent_id=agent_id, host=host),
topic=self.topic)
def update_device_up(self, context, device, agent_id, host=None):
return self.call(context,
self.make_msg('update_device_up', device=device,
agent_id=agent_id, host=host),
topic=self.topic)
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None):
return self.call(context,
self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip,
tunnel_type=tunnel_type),
topic=self.topic)

View File

@ -1,50 +0,0 @@
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import functools
NOTIFICATIONS = []
def reset():
del NOTIFICATIONS[:]
FakeMessage = collections.namedtuple('Message',
['publisher_id', 'priority',
'event_type', 'payload'])
class FakeNotifier(object):
def __init__(self, transport, publisher_id=None):
self.transport = transport
self.publisher_id = publisher_id
for priority in ('debug', 'info', 'warn', 'error', 'critical'):
setattr(self, priority,
functools.partial(self._notify, priority=priority.upper()))
def prepare(self, publisher_id=None):
if publisher_id is None:
publisher_id = self.publisher_id
return self.__class__(self.transport, publisher_id)
def _notify(self, ctxt, event_type, payload, priority):
msg = dict(publisher_id=self.publisher_id,
priority=priority,
event_type=event_type,
payload=payload)
NOTIFICATIONS.append(msg)

View File

@ -1,249 +0,0 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet.event
import eventlet.queue
import eventlet.timeout
import mock
import testtools
from tacker.agent.linux import async_process
from tacker.agent.linux import utils
from tacker.tests import base
_marker = ()
class TestAsyncProcess(base.BaseTestCase):
def setUp(self):
super(TestAsyncProcess, self).setUp()
self.proc = async_process.AsyncProcess(['fake'])
def test_construtor_raises_exception_for_negative_respawn_interval(self):
with testtools.ExpectedException(ValueError):
async_process.AsyncProcess(['fake'], respawn_interval=-1)
def test__spawn(self):
expected_process = 'Foo'
proc = self.proc
with mock.patch.object(utils, 'create_process') as mock_create_process:
mock_create_process.return_value = [expected_process, None]
with mock.patch('eventlet.spawn') as mock_spawn:
proc._spawn()
self.assertIsInstance(proc._kill_event, eventlet.event.Event)
self.assertEqual(proc._process, expected_process)
mock_spawn.assert_has_calls([
mock.call(proc._watch_process,
proc._read_stdout,
proc._kill_event),
mock.call(proc._watch_process,
proc._read_stderr,
proc._kill_event),
])
self.assertEqual(len(proc._watchers), 2)
def test__handle_process_error_kills_with_respawn(self):
with mock.patch.object(self.proc, '_kill') as kill:
self.proc._handle_process_error()
kill.assert_has_calls(mock.call(respawning=False))
def test__handle_process_error_kills_without_respawn(self):
self.proc.respawn_interval = 1
with mock.patch.object(self.proc, '_kill') as kill:
with mock.patch.object(self.proc, '_spawn') as spawn:
with mock.patch('eventlet.sleep') as sleep:
self.proc._handle_process_error()
kill.assert_has_calls(mock.call(respawning=True))
sleep.assert_has_calls(mock.call(self.proc.respawn_interval))
spawn.assert_called_once_with()
def _test__watch_process(self, callback, kill_event):
self.proc._kill_event = kill_event
# Ensure the test times out eventually if the watcher loops endlessly
with eventlet.timeout.Timeout(5):
with mock.patch.object(self.proc,
'_handle_process_error') as func:
self.proc._watch_process(callback, kill_event)
if not kill_event.ready():
func.assert_called_once_with()
def test__watch_process_exits_on_callback_failure(self):
self._test__watch_process(lambda: False, eventlet.event.Event())
def test__watch_process_exits_on_exception(self):
def foo():
raise Exception('Error!')
self._test__watch_process(foo, eventlet.event.Event())
def test__watch_process_exits_on_sent_kill_event(self):
kill_event = eventlet.event.Event()
kill_event.send()
self._test__watch_process(None, kill_event)
def _test_read_output_queues_and_returns_result(self, output):
queue = eventlet.queue.LightQueue()
mock_stream = mock.Mock()
with mock.patch.object(mock_stream, 'readline') as mock_readline:
mock_readline.return_value = output
result = self.proc._read(mock_stream, queue)
if output:
self.assertEqual(output, result)
self.assertEqual(output, queue.get_nowait())
else:
self.assertFalse(result)
self.assertTrue(queue.empty())
def test__read_queues_and_returns_output(self):
self._test_read_output_queues_and_returns_result('foo')
def test__read_returns_none_for_missing_output(self):
self._test_read_output_queues_and_returns_result('')
def test_start_raises_exception_if_process_already_started(self):
self.proc._kill_event = True
with testtools.ExpectedException(async_process.AsyncProcessException):
self.proc.start()
def test_start_invokes__spawn(self):
with mock.patch.object(self.proc, '_spawn') as mock_start:
self.proc.start()
mock_start.assert_called_once_with()
def test__iter_queue_returns_empty_list_for_empty_queue(self):
result = list(self.proc._iter_queue(eventlet.queue.LightQueue()))
self.assertEqual(result, [])
def test__iter_queue_returns_queued_data(self):
queue = eventlet.queue.LightQueue()
queue.put('foo')
result = list(self.proc._iter_queue(queue))
self.assertEqual(result, ['foo'])
def _test_iter_output_calls_iter_queue_on_output_queue(self, output_type):
expected_value = 'foo'
with mock.patch.object(self.proc, '_iter_queue') as mock_iter_queue:
mock_iter_queue.return_value = expected_value
target_func = getattr(self.proc, 'iter_%s' % output_type, None)
value = target_func()
self.assertEqual(value, expected_value)
queue = getattr(self.proc, '_%s_lines' % output_type, None)
mock_iter_queue.assert_called_with(queue)
def test_iter_stdout(self):
self._test_iter_output_calls_iter_queue_on_output_queue('stdout')
def test_iter_stderr(self):
self._test_iter_output_calls_iter_queue_on_output_queue('stderr')
def _test__kill(self, respawning, pid=None):
with mock.patch.object(self.proc, '_kill_event') as mock_kill_event:
with mock.patch.object(self.proc, '_get_pid_to_kill',
return_value=pid):
with mock.patch.object(self.proc,
'_kill_process') as mock_kill_process:
self.proc._kill(respawning)
if respawning:
self.assertIsNotNone(self.proc._kill_event)
else:
self.assertIsNone(self.proc._kill_event)
mock_kill_event.send.assert_called_once_with()
if pid:
mock_kill_process.assert_called_once_with(pid)
def test__kill_when_respawning_does_not_clear_kill_event(self):
self._test__kill(True)
def test__kill_when_not_respawning_clears_kill_event(self):
self._test__kill(False)
def test__kill_targets_process_for_pid(self):
self._test__kill(False, pid='1')
def _test__get_pid_to_kill(self, expected=_marker,
root_helper=None, pids=None):
def _find_child_pids(x):
if not pids:
return []
pids.pop(0)
return pids
if root_helper:
self.proc.root_helper = root_helper
with mock.patch.object(self.proc, '_process') as mock_process:
with mock.patch.object(mock_process, 'pid') as mock_pid:
with mock.patch.object(utils, 'find_child_pids',
side_effect=_find_child_pids):
actual = self.proc._get_pid_to_kill()
if expected is _marker:
expected = mock_pid
self.assertEqual(expected, actual)
def test__get_pid_to_kill_returns_process_pid_without_root_helper(self):
self._test__get_pid_to_kill()
def test__get_pid_to_kill_returns_child_pid_with_root_helper(self):
self._test__get_pid_to_kill(expected='2', pids=['1', '2'],
root_helper='a')
def test__get_pid_to_kill_returns_last_child_pid_with_root_Helper(self):
self._test__get_pid_to_kill(expected='3', pids=['1', '2', '3'],
root_helper='a')
def test__get_pid_to_kill_returns_none_with_root_helper(self):
self._test__get_pid_to_kill(expected=None, root_helper='a')
def _test__kill_process(self, pid, expected, exception_message=None):
self.proc.root_helper = 'foo'
if exception_message:
exc = RuntimeError(exception_message)
else:
exc = None
with mock.patch.object(utils, 'execute',
side_effect=exc) as mock_execute:
actual = self.proc._kill_process(pid)
self.assertEqual(expected, actual)
mock_execute.assert_called_with(['kill', '-9', pid],
root_helper=self.proc.root_helper)
def test__kill_process_returns_true_for_valid_pid(self):
self._test__kill_process('1', True)
def test__kill_process_returns_true_for_stale_pid(self):
self._test__kill_process('1', True, 'No such process')
def test__kill_process_returns_false_for_execute_exception(self):
self._test__kill_process('1', False, 'Invalid')
def test_stop_calls_kill(self):
self.proc._kill_event = True
with mock.patch.object(self.proc, '_kill') as mock_kill:
self.proc.stop()
mock_kill.assert_called_once_with()
def test_stop_raises_exception_if_already_started(self):
with testtools.ExpectedException(async_process.AsyncProcessException):
self.proc.stop()