LBaaS Agent Reference Implementation

implements blueprint lbaas-namespace-agent

This a reference implemention of the Quantum load balancing service
using HAProxy.  The implemention is designed for vendors, developers,
and deployers to become familiar with the API and service workflow.

This change also adds some constraint checks for data integrity.

Change-Id: I10a67da11840477ccf063b98149f4f77248802a1
This commit is contained in:
Mark McClain 2013-02-24 07:55:06 -05:00
parent ce7f38cd40
commit 0ade06453c
34 changed files with 2359 additions and 328 deletions

26
bin/quantum-lbaas-agent Executable file
View File

@ -0,0 +1,26 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 Openstack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import sys
sys.path.insert(0, os.getcwd())
from quantum.plugins.services.agent_loadbalancer.agent import main
main()

24
etc/lbaas_agent.ini Normal file
View File

@ -0,0 +1,24 @@
[DEFAULT]
# Show debugging output in log (sets DEBUG log level output)
# debug = true
# The LBaaS agent will resync its state with Quantum to recover from any
# transient notification or rpc errors. The interval is number of
# seconds between attempts.
# periodic_interval = 10
# OVS based plugins(OVS, Ryu, NEC, NVP, BigSwitch/Floodlight)
interface_driver = quantum.agent.linux.interface.OVSInterfaceDriver
# OVS based plugins(Ryu, NEC, NVP, BigSwitch/Floodlight) that use OVS
# as OpenFlow switch and check port status
# ovs_use_veth = True
# LinuxBridge
# interface_driver = quantum.agent.linux.interface.BridgeInterfaceDriver
# The agent requires a driver to manage the loadbalancer. HAProxy is the
# opensource version.
device_driver = quantum.plugins.services.agent_loadbalancer.drivers.haproxy.namespace_driver.HaproxyNSDriver
# Allow overlapping IP (Must have kernel build with CONFIG_NET_NS=y and
# iproute2 package that supports namespaces).
# use_namespaces = True

View File

@ -0,0 +1,29 @@
# quantum-rootwrap command filters for nodes on which quantum is
# expected to control network
#
# This file should be owned by (and only-writeable by) the root user
# format seems to be
# cmd-name: filter-name, raw-command, user, args
[Filters]
# haproxy
haproxy: CommandFilter, /usr/sbin/haproxy, root
# lbaas-agent uses kill as well, that's handled by the generic KillFilter
kill_haproxy_usr: KillFilter, root, /usr/sbin/haproxy, -9, -HUP
# lbaas-agent uses cat
cat: RegExpFilter, /bin/cat, root, cat, /proc/\d+/cmdline
ovs-vsctl: CommandFilter, /bin/ovs-vsctl, root
ovs-vsctl_usr: CommandFilter, /usr/bin/ovs-vsctl, root
ovs-vsctl_sbin: CommandFilter, /sbin/ovs-vsctl, root
ovs-vsctl_sbin_usr: CommandFilter, /usr/sbin/ovs-vsctl, root
# ip_lib
ip: IpFilter, /sbin/ip, root
ip_usr: IpFilter, /usr/sbin/ip, root
ip_exec: IpNetnsExecFilter, /sbin/ip, root
ip_exec_usr: IpNetnsExecFilter, /usr/sbin/ip, root

View File

@ -21,7 +21,6 @@ import re
import socket
import StringIO
import sys
import tempfile
import netaddr
from oslo.config import cfg
@ -187,7 +186,7 @@ class DhcpLocalProcess(DhcpBase):
def interface_name(self, value):
interface_file_path = self.get_conf_file_name('interface',
ensure_conf_dir=True)
replace_file(interface_file_path, value)
utils.replace_file(interface_file_path, value)
@abc.abstractmethod
def spawn_process(self):
@ -298,7 +297,7 @@ class Dnsmasq(DhcpLocalProcess):
(port.mac_address, name, alloc.ip_address))
name = self.get_conf_file_name('host')
replace_file(name, buf.getvalue())
utils.replace_file(name, buf.getvalue())
return name
def _output_opts_file(self):
@ -344,7 +343,7 @@ class Dnsmasq(DhcpLocalProcess):
options.append(self._format_option(i, 'router'))
name = self.get_conf_file_name('opts')
replace_file(name, '\n'.join(options))
utils.replace_file(name, '\n'.join(options))
return name
def _make_subnet_interface_ip_map(self):
@ -402,20 +401,3 @@ class Dnsmasq(DhcpLocalProcess):
sock.connect(dhcp_relay_socket)
sock.send(jsonutils.dumps(data))
sock.close()
def replace_file(file_name, data):
"""Replaces the contents of file_name with data in a safe manner.
First write to a temp file and then rename. Since POSIX renames are
atomic, the file is unlikely to be corrupted by competing writes.
We create the tempfile on the same device to ensure that it can be renamed.
"""
base_dir = os.path.dirname(os.path.abspath(file_name))
tmp_file = tempfile.NamedTemporaryFile('w+', dir=base_dir, delete=False)
tmp_file.write(data)
tmp_file.close()
os.chmod(tmp_file.name, 0644)
os.rename(tmp_file.name, file_name)

View File

@ -22,6 +22,7 @@ import os
import shlex
import socket
import struct
import tempfile
from eventlet.green import subprocess
@ -71,3 +72,20 @@ def get_interface_mac(interface):
struct.pack('256s', interface[:DEVICE_NAME_LEN]))
return ''.join(['%02x:' % ord(char)
for char in info[MAC_START:MAC_END]])[:-1]
def replace_file(file_name, data):
"""Replaces the contents of file_name with data in a safe manner.
First write to a temp file and then rename. Since POSIX renames are
atomic, the file is unlikely to be corrupted by competing writes.
We create the tempfile on the same device to ensure that it can be renamed.
"""
base_dir = os.path.dirname(os.path.abspath(file_name))
tmp_file = tempfile.NamedTemporaryFile('w+', dir=base_dir, delete=False)
tmp_file.write(data)
tmp_file.close()
os.chmod(tmp_file.name, 0644)
os.rename(tmp_file.name, file_name)

View File

@ -25,9 +25,11 @@ UPDATE = 'update'
AGENT = 'q-agent-notifier'
PLUGIN = 'q-plugin'
DHCP = 'q-dhcp-notifer'
LOADBALANCER_PLUGIN = 'q-loadbalancer-plugin'
L3_AGENT = 'l3_agent'
DHCP_AGENT = 'dhcp_agent'
LOADBALANCER_AGENT = 'loadbalancer_agent'
def get_topic_name(prefix, table, operation):

View File

@ -69,7 +69,7 @@ class Vip(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
protocol_port = sa.Column(sa.Integer, nullable=False)
protocol = sa.Column(sa.Enum("HTTP", "HTTPS", "TCP", name="lb_protocols"),
nullable=False)
pool_id = sa.Column(sa.String(36), nullable=False)
pool_id = sa.Column(sa.String(36), nullable=False, unique=True)
session_persistence = orm.relationship(SessionPersistence,
uselist=False,
backref="vips",
@ -114,6 +114,7 @@ class Pool(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
cascade="all, delete-orphan")
monitors = orm.relationship("PoolMonitorAssociation", backref="pools",
cascade="all, delete-orphan")
vip = orm.relationship(Vip, backref='pool')
class HealthMonitor(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
@ -239,6 +240,12 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
raise
return r
def assert_modification_allowed(self, obj):
status = getattr(obj, 'status', None)
if status == constants.PENDING_DELETE:
raise loadbalancer.StateInvalid(id=id, state=status)
########################################################
# VIP DB access
def _make_vip_dict(self, vip, fields=None):
@ -270,11 +277,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
return self._fields(res, fields)
def _update_pool_vip_info(self, context, pool_id, vip_id):
pool_db = self._get_resource(context, Pool, pool_id)
with context.session.begin(subtransactions=True):
pool_db.update({'vip_id': vip_id})
def _check_session_persistence_info(self, info):
""" Performs sanity check on session persistence info.
:param info: Session persistence info
@ -355,6 +357,14 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
tenant_id = self._get_tenant_id_for_create(context, v)
with context.session.begin(subtransactions=True):
# validate that the pool has same tenant
if v['pool_id']:
pool = self._get_resource(context, Pool, v['pool_id'])
if pool['tenant_id'] != tenant_id:
raise q_exc.NotAuthorized()
else:
pool = None
vip_db = Vip(id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
name=v['name'],
@ -367,16 +377,18 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
admin_state_up=v['admin_state_up'],
status=constants.PENDING_CREATE)
vip_id = vip_db['id']
session_info = v['session_persistence']
if session_info:
s_p = self._create_session_persistence_db(session_info, vip_id)
s_p = self._create_session_persistence_db(
session_info,
vip_db['id'])
vip_db.session_persistence = s_p
context.session.add(vip_db)
context.session.flush()
# create a port to reserve address for IPAM
self._create_port_for_vip(
context,
vip_db,
@ -384,7 +396,9 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
v.get('address')
)
self._update_pool_vip_info(context, v['pool_id'], vip_id)
if pool:
pool['vip_id'] = vip_db['id']
return self._make_vip_dict(vip_db)
def update_vip(self, context, id, vip):
@ -392,20 +406,36 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
sess_persist = v.pop('session_persistence', None)
with context.session.begin(subtransactions=True):
vip_db = self._get_resource(context, Vip, id)
self.assert_modification_allowed(vip_db)
if sess_persist:
self._update_vip_session_persistence(context, id, sess_persist)
else:
self._delete_session_persistence(context, id)
vip_db = self._get_resource(context, Vip, id)
old_pool_id = vip_db['pool_id']
if v:
vip_db.update(v)
# If the pool_id is changed, we need to update
# the associated pools
if 'pool_id' in v:
self._update_pool_vip_info(context, old_pool_id, None)
self._update_pool_vip_info(context, v['pool_id'], id)
new_pool = self._get_resource(context, Pool, v['pool_id'])
self.assert_modification_allowed(new_pool)
# check that the pool matches the tenant_id
if new_pool['tenant_id'] != vip_db['tenant_id']:
raise q_exc.NotAuthorized()
if vip_db['pool_id']:
old_pool = self._get_resource(
context,
Pool,
vip_db['pool_id']
)
old_pool['vip_id'] = None
new_pool['vip_id'] = vip_db['id']
return self._make_vip_dict(vip_db)
@ -432,7 +462,7 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
########################################################
# Pool DB access
def _make_pool_dict(self, context, pool, fields=None):
def _make_pool_dict(self, pool, fields=None):
res = {'id': pool['id'],
'tenant_id': pool['tenant_id'],
'name': pool['name'],
@ -453,16 +483,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
return self._fields(res, fields)
def _update_pool_member_info(self, context, pool_id, membersInfo):
with context.session.begin(subtransactions=True):
member_qry = context.session.query(Member)
for member_id in membersInfo:
try:
member = member_qry.filter_by(id=member_id).one()
member.update({'pool_id': pool_id})
except exc.NoResultFound:
raise loadbalancer.MemberNotFound(member_id=member_id)
def _create_pool_stats(self, context, pool_id):
# This is internal method to add pool statistics. It won't
# be exposed to API
@ -504,17 +524,17 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
context.session.add(pool_db)
pool_db = self._get_resource(context, Pool, pool_db['id'])
return self._make_pool_dict(context, pool_db)
return self._make_pool_dict(pool_db)
def update_pool(self, context, id, pool):
v = pool['pool']
p = pool['pool']
with context.session.begin(subtransactions=True):
pool_db = self._get_resource(context, Pool, id)
if v:
pool_db.update(v)
if p:
pool_db.update(p)
return self._make_pool_dict(context, pool_db)
return self._make_pool_dict(pool_db)
def delete_pool(self, context, id):
# Check if the pool is in use
@ -529,15 +549,15 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
def get_pool(self, context, id, fields=None):
pool = self._get_resource(context, Pool, id)
return self._make_pool_dict(context, pool, fields)
return self._make_pool_dict(pool, fields)
def get_pools(self, context, filters=None, fields=None):
collection = self._model_query(context, Pool)
collection = self._apply_filters_to_query(collection, Pool, filters)
return [self._make_pool_dict(context, c, fields)
return [self._make_pool_dict(c, fields)
for c in collection.all()]
def get_stats(self, context, pool_id):
def stats(self, context, pool_id):
with context.session.begin(subtransactions=True):
pool_qry = context.session.query(Pool)
try:
@ -600,6 +620,7 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
raise loadbalancer.HealthMonitorNotFound(monitor_id=id)
def get_pool_health_monitor(self, context, id, pool_id, fields=None):
# TODO(markmcclain) look into why pool_id is ignored
healthmonitor = self._get_resource(context, HealthMonitor, id)
return self._make_health_monitor_dict(healthmonitor, fields)
@ -644,7 +665,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
v = member['member']
with context.session.begin(subtransactions=True):
member_db = self._get_resource(context, Member, id)
old_pool_id = member_db['pool_id']
if v:
member_db.update(v)

View File

@ -58,6 +58,7 @@ def upgrade(active_plugin=None, options=None):
sa.Column(u'admin_state_up', sa.Boolean(), nullable=False),
sa.Column(u'connection_limit', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['port_id'], ['ports.id'], ),
sa.UniqueConstraint('pool_id'),
sa.PrimaryKeyConstraint(u'id')
)
op.create_table(

View File

@ -68,6 +68,7 @@ RESOURCE_ATTRIBUTE_MAP = {
'is_visible': True},
'name': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
'default': '',
'is_visible': True},
'description': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
@ -128,6 +129,7 @@ RESOURCE_ATTRIBUTE_MAP = {
'is_visible': True},
'name': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
'default': '',
'is_visible': True},
'description': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},

View File

@ -0,0 +1,67 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost
import eventlet
from oslo.config import cfg
from quantum.agent.common import config
from quantum.agent.linux import interface
from quantum.common import topics
from quantum.openstack.common.rpc import service as rpc_service
from quantum.openstack.common import service
from quantum.plugins.services.agent_loadbalancer.agent import manager
OPTS = [
cfg.IntOpt(
'periodic_interval',
default=10,
help=_('Seconds between periodic task runs')
)
]
class LbaasAgentService(rpc_service.Service):
def start(self):
super(LbaasAgentService, self).start()
self.tg.add_timer(
cfg.CONF.periodic_interval,
self.manager.run_periodic_tasks,
None,
None
)
def main():
eventlet.monkey_patch()
cfg.CONF.register_opts(OPTS)
cfg.CONF.register_opts(manager.OPTS)
# import interface options just in case the driver uses namespaces
cfg.CONF.register_opts(interface.OPTS)
config.register_root_helper(cfg.CONF)
cfg.CONF(project='quantum')
config.setup_logging(cfg.CONF)
mgr = manager.LbaasAgentManager(cfg.CONF)
svc = LbaasAgentService(
host=cfg.CONF.host,
topic=topics.LOADBALANCER_AGENT,
manager=mgr
)
service.launch(svc).wait()

View File

@ -0,0 +1,81 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost
from quantum.openstack.common.rpc import proxy
class LbaasAgentApi(proxy.RpcProxy):
"""Agent side of the Agent to Plugin RPC API."""
API_VERSION = '1.0'
def __init__(self, topic, context, host):
super(LbaasAgentApi, self).__init__(topic, self.API_VERSION)
self.context = context
self.host = host
def get_ready_devices(self):
return self.call(
self.context,
self.make_msg('get_ready_devices', host=self.host),
topic=self.topic
)
def get_logical_device(self, pool_id):
return self.call(
self.context,
self.make_msg(
'get_logical_device',
pool_id=pool_id,
host=self.host
),
topic=self.topic
)
def pool_destroyed(self, pool_id):
return self.call(
self.context,
self.make_msg('pool_destroyed', pool_id=pool_id, host=self.host),
topic=self.topic
)
def plug_vip_port(self, port_id):
return self.call(
self.context,
self.make_msg('plug_vip_port', port_id=port_id, host=self.host),
topic=self.topic
)
def unplug_vip_port(self, port_id):
return self.call(
self.context,
self.make_msg('unplug_vip_port', port_id=port_id, host=self.host),
topic=self.topic
)
def update_pool_stats(self, pool_id, stats):
return self.call(
self.context,
self.make_msg(
'update_pool_stats',
pool_id=pool_id,
stats=stats,
host=self.host
),
topic=self.topic
)

View File

@ -0,0 +1,221 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost
import weakref
from oslo.config import cfg
from quantum.agent.common import config
from quantum.common import topics
from quantum import context
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import periodic_task
from quantum.plugins.services.agent_loadbalancer.agent import api
LOG = logging.getLogger(__name__)
NS_PREFIX = 'qlbaas-'
OPTS = [
cfg.StrOpt(
'device_driver',
help=_('The driver used to manage the loadbalancing device'),
),
cfg.StrOpt(
'loadbalancer_state_path',
default='$state_path/lbaas',
help=_('Location to store config and state files'),
),
cfg.StrOpt(
'interface_driver',
help=_('The driver used to manage the virtual interface')
)
]
class LogicalDeviceCache(object):
"""Manage a cache of known devices."""
class Device(object):
"""Inner classes used to hold values for weakref lookups"""
def __init__(self, port_id, pool_id):
self.port_id = port_id
self.pool_id = pool_id
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __hash__(self):
return hash((self.port_id, self.pool_id))
def __init__(self):
self.devices = set()
self.port_lookup = weakref.WeakValueDictionary()
self.pool_lookup = weakref.WeakValueDictionary()
def put(self, device):
port_id = device['vip']['port_id']
pool_id = device['pool']['id']
d = self.Device(device['vip']['port_id'], device['pool']['id'])
if d not in self.devices:
self.devices.add(d)
self.port_lookup[port_id] = d
self.pool_lookup[pool_id] = d
def remove(self, device):
if not isinstance(device, self.Device):
device = self.Device(
device['vip']['port_id'], device['pool']['id']
)
if device in self.devices:
self.devices.remove(device)
def remove_by_pool_id(self, pool_id):
d = self.pool_lookup.get(pool_id)
if d:
self.devices.remove(d)
def get_by_pool_id(self, pool_id):
return self.pool_lookup.get(pool_id)
def get_by_port_id(self, port_id):
return self.port_lookup.get(port_id)
def get_pool_ids(self):
return self.pool_lookup.keys()
class LbaasAgentManager(periodic_task.PeriodicTasks):
def __init__(self, conf):
self.conf = conf
try:
vif_driver = importutils.import_object(conf.interface_driver, conf)
except ImportError:
# the driver is optional
msg = _('Error importing interface driver: %s')
raise SystemExit(msg % conf.interface_driver)
vif_driver = None
try:
self.driver = importutils.import_object(
conf.device_driver,
config.get_root_helper(self.conf),
conf.loadbalancer_state_path,
vif_driver,
self._vip_plug_callback
)
except ImportError:
msg = _('Error importing loadbalancer device driver: %s')
raise SystemExit(msg % conf.device_driver)
ctx = context.get_admin_context_without_session()
self.plugin_rpc = api.LbaasAgentApi(
topics.LOADBALANCER_PLUGIN,
ctx,
conf.host
)
self.needs_resync = False
self.cache = LogicalDeviceCache()
def initialize_service_hook(self, started_by):
self.sync_state()
@periodic_task.periodic_task
def periodic_resync(self, context):
if self.needs_resync:
self.needs_resync = False
self.sync_state()
@periodic_task.periodic_task(ticks_between_runs=6)
def collect_stats(self, context):
for pool_id in self.cache.get_pool_ids():
try:
stats = self.driver.get_stats(pool_id)
if stats:
self.plugin_rpc.update_pool_stats(pool_id, stats)
except Exception:
LOG.exception(_('Error upating stats'))
self.needs_resync = True
def _vip_plug_callback(self, action, port):
if action == 'plug':
self.plugin_rpc.plug_vip_port(port['id'])
elif action == 'unplug':
self.plugin_rpc.unplug_vip_port(port['id'])
def sync_state(self):
known_devices = set(self.cache.get_pool_ids())
try:
ready_logical_devices = set(self.plugin_rpc.get_ready_devices())
for deleted_id in known_devices - ready_logical_devices:
self.destroy_device(deleted_id)
for pool_id in ready_logical_devices:
self.refresh_device(pool_id)
except Exception:
LOG.exception(_('Unable to retrieve ready devices'))
self.needs_resync = True
self.remove_orphans()
def refresh_device(self, pool_id):
try:
logical_config = self.plugin_rpc.get_logical_device(pool_id)
if self.driver.exists(pool_id):
self.driver.update(logical_config)
else:
self.driver.create(logical_config)
self.cache.put(logical_config)
except Exception:
LOG.exception(_('Unable to refresh device for pool: %s'), pool_id)
self.needs_resync = True
def destroy_device(self, pool_id):
device = self.cache.get_by_pool_id(pool_id)
if not device:
return
try:
self.driver.destroy(pool_id)
self.plugin_rpc.pool_destroyed(pool_id)
except Exception:
LOG.exception(_('Unable to destroy device for pool: %s'), pool_id)
self.needs_resync = True
self.cache.remove(device)
def remove_orphans(self):
try:
self.driver.remove_orphans(self.cache.get_pool_ids())
except NotImplementedError:
pass # Not all drivers will support this
def reload_pool(self, context, pool_id=None, host=None):
"""Handle RPC cast from plugin to reload a pool."""
if pool_id:
self.refresh_device(pool_id)
def modify_pool(self, context, pool_id=None, host=None):
"""Handle RPC cast from plugin to modify a pool if known to agent."""
if self.cache.get_by_pool_id(pool_id):
self.refresh_device(pool_id)
def destroy_pool(self, context, pool_id=None, host=None):
"""Handle RPC cast from plugin to destroy a pool if known to agent."""
if self.cache.get_by_pool_id(pool_id):
self.destroy_device(pool_id)

View File

@ -0,0 +1,33 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
LB_METHOD_ROUND_ROBIN = 'ROUND_ROBIN'
LB_METHOD_LEAST_CONNECTIONS = 'LEAST_CONNECTIONS'
LB_METHOD_SOURCE_IP = 'SOURCE_IP'
PROTOCOL_TCP = 'TCP'
PROTOCOL_HTTP = 'HTTP'
PROTOCOL_HTTPS = 'HTTPS'
HEALTH_MONITOR_PING = 'PING'
HEALTH_MONITOR_TCP = 'TCP'
HEALTH_MONITOR_HTTP = 'HTTP'
HEALTH_MONITOR_HTTPS = 'HTTPS'
SESSION_PERSISTENCE_SOURCE_IP = 'SOURCE_IP'
SESSION_PERSISTENCE_HTTP_COOKIE = 'HTTP_COOKIE'
SESSION_PERSISTENCE_APP_COOKIE = 'APP_COOKIE'

View File

@ -0,0 +1,17 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost

View File

@ -0,0 +1,17 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost

View File

@ -0,0 +1,184 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost
import itertools
from quantum.agent.linux import utils
from quantum.plugins.common import constants as qconstants
from quantum.plugins.services.agent_loadbalancer import constants
PROTOCOL_MAP = {
constants.PROTOCOL_TCP: 'tcp',
constants.PROTOCOL_HTTP: 'http',
constants.PROTOCOL_HTTPS: 'tcp',
}
BALANCE_MAP = {
constants.LB_METHOD_ROUND_ROBIN: 'roundrobin',
constants.LB_METHOD_LEAST_CONNECTIONS: 'leastconn',
constants.LB_METHOD_SOURCE_IP: 'source'
}
ACTIVE = qconstants.ACTIVE
def save_config(conf_path, logical_config, socket_path=None):
"""Convert a logical configuration to the HAProxy version"""
data = []
data.extend(_build_global(logical_config, socket_path=socket_path))
data.extend(_build_defaults(logical_config))
data.extend(_build_frontend(logical_config))
data.extend(_build_backend(logical_config))
utils.replace_file(conf_path, '\n'.join(data))
def _build_global(config, socket_path=None):
opts = [
'daemon',
'user nobody',
'group nogroup',
'log /dev/log local0',
'log /dev/log local1 notice'
]
if socket_path:
opts.append('stats socket %s mode 0666 level user' % socket_path)
return itertools.chain(['global'], ('\t' + o for o in opts))
def _build_defaults(config):
opts = [
'log global',
'retries 3',
'option redispatch',
'timeout connect 5000',
'timeout client 50000',
'timeout server 50000',
]
return itertools.chain(['defaults'], ('\t' + o for o in opts))
def _build_frontend(config):
protocol = config['vip']['protocol']
opts = [
'option tcplog',
'bind %s:%d' % (
_get_first_ip_from_port(config['vip']['port']),
config['vip']['protocol_port']
),
'mode %s' % PROTOCOL_MAP[protocol],
'default_backend %s' % config['pool']['id'],
]
if config['vip']['connection_limit'] >= 0:
opts.append('maxconn %s' % config['vip']['connection_limit'])
if protocol == constants.PROTOCOL_HTTP:
opts.append('option forwardfor')
return itertools.chain(
['frontend %s' % config['vip']['id']],
('\t' + o for o in opts)
)
def _build_backend(config):
protocol = config['pool']['protocol']
lb_method = config['pool']['lb_method']
opts = [
'mode %s' % PROTOCOL_MAP[protocol],
'balance %s' % BALANCE_MAP.get(lb_method, 'roundrobin')
]
if protocol == constants.PROTOCOL_HTTP:
opts.append('option forwardfor')
# add the first health_monitor (if available)
server_addon, health_opts = _get_server_health_option(config)
opts.extend(health_opts)
# add the members
opts.extend(
(('server %(id)s %(address)s:%(protocol_port)s '
'weight %(weight)s') % member) + server_addon
for member in config['members']
if (member['status'] == ACTIVE and member['admin_state_up'])
)
return itertools.chain(
['backend %s' % config['pool']['id']],
('\t' + o for o in opts)
)
def _get_first_ip_from_port(port):
for fixed_ip in port['fixed_ips']:
return fixed_ip['ip_address']
def _get_server_health_option(config):
"""return the first active health option"""
for monitor in config['healthmonitors']:
if monitor['status'] == ACTIVE and monitor['admin_state_up']:
break
else:
return '', []
server_addon = ' check inter %(delay)ds fall %(max_retries)d' % monitor
opts = [
'timeout check %ds' % monitor['timeout']
]
if monitor['type'] in (constants.HEALTH_MONITOR_HTTP,
constants.HEALTH_MONITOR_HTTPS):
opts.append('option httpchk %(http_method)s %(url_path)s' % monitor)
opts.append(
'http-check expect rstatus %s' %
'|'.join(_expand_expected_codes(monitor['expected_codes']))
)
if monitor['type'] == constants.HEALTH_MONITOR_HTTPS:
opts.append('option ssl-hello-chk')
return server_addon, opts
def _expand_expected_codes(codes):
"""Expand the expected code string in set of codes.
200-204 -> 200, 201, 202, 204
200, 203 -> 200, 203
"""
retval = set()
for code in codes.replace(',', ' ').split(' '):
code = code.strip()
if not code:
continue
elif '-' in code:
low, hi = code.split('-')[:2]
retval.update(str(i) for i in xrange(int(low), int(hi)))
else:
retval.add(code)
return retval

View File

@ -0,0 +1,182 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost
import os
import shutil
import socket
import netaddr
from quantum.agent.linux import ip_lib
from quantum.common import exceptions
from quantum.openstack.common import log as logging
from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
cfg as hacfg
)
LOG = logging.getLogger(__name__)
NS_PREFIX = 'qlbaas-'
class HaproxyNSDriver(object):
def __init__(self, root_helper, state_path, vif_driver, vip_plug_callback):
self.root_helper = root_helper
self.state_path = state_path
self.vif_driver = vif_driver
self.vip_plug_callback = vip_plug_callback
self.pool_to_port_id = {}
def create(self, logical_config):
pool_id = logical_config['pool']['id']
namespace = get_ns_name(pool_id)
self._plug(namespace, logical_config['vip']['port'])
self._spawn(logical_config)
def update(self, logical_config):
pool_id = logical_config['pool']['id']
pid_path = self._get_state_file_path(pool_id, 'pid')
extra_args = ['-sf']
extra_args.extend(p.strip() for p in open(pid_path, 'r'))
self._spawn(logical_config, extra_args)
def _spawn(self, logical_config, extra_cmd_args=()):
pool_id = logical_config['pool']['id']
namespace = get_ns_name(pool_id)
conf_path = self._get_state_file_path(pool_id, 'conf')
pid_path = self._get_state_file_path(pool_id, 'pid')
sock_path = self._get_state_file_path(pool_id, 'sock')
hacfg.save_config(conf_path, logical_config, sock_path)
cmd = ['haproxy', '-f', conf_path, '-p', pid_path]
cmd.extend(extra_cmd_args)
ns = ip_lib.IPWrapper(self.root_helper, namespace)
ns.netns.execute(cmd)
# remember the pool<>port mapping
self.pool_to_port_id[pool_id] = logical_config['vip']['port']['id']
def destroy(self, pool_id):
namespace = get_ns_name(pool_id)
ns = ip_lib.IPWrapper(self.root_helper, namespace)
pid_path = self._get_state_file_path(pool_id, 'pid')
sock_path = self._get_state_file_path(pool_id, 'sock')
# kill the process
kill_pids_in_file(ns, pid_path)
# unplug the ports
if pool_id in self.pool_to_port_id:
self._unplug(namespace, self.pool_to_port_id[pool_id])
# remove the configuration directory
conf_dir = os.path.dirname(self._get_state_file_path(pool_id, ''))
if os.path.isdir(conf_dir):
shutil.rmtree(conf_dir)
ns.garbage_collect_namespace()
def exists(self, pool_id):
namespace = get_ns_name(pool_id)
root_ns = ip_lib.IPWrapper(self.root_helper)
socket_path = self._get_state_file_path(pool_id, 'sock')
if root_ns.netns.exists(namespace) and os.path.exists(socket_path):
try:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(socket_path)
return True
except socket.error:
pass
return False
def get_stats(self, pool_id):
pass
def remove_orphans(self, known_pool_ids):
raise NotImplementedError()
def _get_state_file_path(self, pool_id, kind, ensure_state_dir=True):
"""Returns the file name for a given kind of config file."""
confs_dir = os.path.abspath(os.path.normpath(self.state_path))
conf_dir = os.path.join(confs_dir, pool_id)
if ensure_state_dir:
if not os.path.isdir(conf_dir):
os.makedirs(conf_dir, 0755)
return os.path.join(conf_dir, kind)
def _plug(self, namespace, port, reuse_existing=True):
self.vip_plug_callback('plug', port)
interface_name = self.vif_driver.get_device_name(Wrap(port))
if ip_lib.device_exists(interface_name, self.root_helper, namespace):
if not reuse_existing:
raise exceptions.PreexistingDeviceFailure(
dev_name=interface_name
)
else:
self.vif_driver.plug(
port['network_id'],
port['id'],
interface_name,
port['mac_address'],
namespace=namespace
)
cidrs = [
'%s/%s' % (ip['ip_address'],
netaddr.IPNetwork(ip['subnet']['cidr']).prefixlen)
for ip in port['fixed_ips']
]
self.vif_driver.init_l3(interface_name, cidrs, namespace=namespace)
def _unplug(self, namespace, port_id):
port_stub = {'id': port_id}
self.vip_plug_callback('unplug', port_stub)
interface_name = self.vif_driver.get_device_name(Wrap(port_stub))
self.vif_driver.unplug(interface_name, namespace=namespace)
# NOTE (markmcclain) For compliance with interface.py which expects objects
class Wrap(object):
"""A light attribute wrapper for compatibility with the interface lib."""
def __init__(self, d):
self.__dict__.update(d)
def __getitem__(self, key):
return self.__dict__[key]
def get_ns_name(namespace_id):
return NS_PREFIX + namespace_id
def kill_pids_in_file(namespace_wrapper, pid_path):
if os.path.exists(pid_path):
with open(pid_path, 'r') as pids:
for pid in pids:
pid = pid.strip()
try:
namespace_wrapper.netns.execute(
['kill', '-9', pid.strip()]
)
except RuntimeError:
LOG.exception(
_('Unable to kill haproxy process: %s'),
pid
)

View File

@ -0,0 +1,338 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from oslo.config import cfg
from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc
from quantum.common import topics
from quantum.db import api as qdbapi
from quantum.db.loadbalancer import loadbalancer_db
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
from quantum.plugins.common import constants
LOG = logging.getLogger(__name__)
ACTIVE_PENDING = (
constants.ACTIVE,
constants.PENDING_CREATE,
constants.PENDING_UPDATE
)
class LoadBalancerCallbacks(object):
RPC_API_VERSION = '1.0'
def __init__(self, plugin):
self.plugin = plugin
def create_rpc_dispatcher(self):
return q_rpc.PluginRpcDispatcher([self])
def get_ready_devices(self, context, host=None):
with context.session.begin(subtransactions=True):
qry = context.session.query(
loadbalancer_db.Vip, loadbalancer_db.Pool
)
qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING))
qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING))
up = True # makes pep8 and sqlalchemy happy
qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up)
qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
return [p.id for v, p in qry.all()]
def get_logical_device(self, context, pool_id=None, activate=True,
**kwargs):
with context.session.begin(subtransactions=True):
qry = context.session.query(loadbalancer_db.Pool)
qry = qry.filter_by(id=pool_id)
pool = qry.one()
if activate:
# set all resources to active
if pool.status in ACTIVE_PENDING:
pool.status = constants.ACTIVE
if pool.vip.status in ACTIVE_PENDING:
pool.vip.status = constants.ACTIVE
for m in pool.members:
if m.status in ACTIVE_PENDING:
m.status = constants.ACTIVE
for hm in pool.monitors:
if hm.monitor.status in ACTIVE_PENDING:
hm.monitor.status = constants.ACTIVE
if (pool.status != constants.ACTIVE
or pool.vip.status != constants.ACTIVE):
raise Exception(_('Expected active pool and vip'))
retval = {}
retval['pool'] = self.plugin._make_pool_dict(pool)
retval['vip'] = self.plugin._make_vip_dict(pool.vip)
retval['vip']['port'] = (
self.plugin._core_plugin._make_port_dict(pool.vip.port)
)
for fixed_ip in retval['vip']['port']['fixed_ips']:
fixed_ip['subnet'] = (
self.plugin._core_plugin.get_subnet(
context,
fixed_ip['subnet_id']
)
)
retval['members'] = [
self.plugin._make_member_dict(m)
for m in pool.members if m.status == constants.ACTIVE
]
retval['healthmonitors'] = [
self.plugin._make_health_monitor_dict(hm.monitor)
for hm in pool.monitors
if hm.monitor.status == constants.ACTIVE
]
return retval
def pool_destroyed(self, context, pool_id=None, host=None):
"""Agent confirmation hook that a pool has been destroyed.
This method exists for subclasses to change the deletion
behavior.
"""
pass
def plug_vip_port(self, context, port_id=None, host=None):
if not port_id:
return
port = self.plugin._core_plugin.get_port(
context,
port_id
)
port['admin_state_up'] = True
port['device_owner'] = 'quantum:' + constants.LOADBALANCER
port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host)))
self.plugin._core_plugin.update_port(
context,
port_id,
{'port': port}
)
def unplug_vip_port(self, context, port_id=None, host=None):
if not port_id:
return
port = self.plugin._core_plugin.get_port(
context,
port_id
)
port['admin_state_up'] = False
port['device_owner'] = ''
port['device_id'] = ''
try:
self.plugin._core_plugin.update_port(
context,
port_id,
{'port': port}
)
except q_exc.PortNotFound:
msg = _('Unable to find port %s to unplug. This can occur when '
'the Vip has been deleted first.')
LOG.debug(msg, port_id)
def update_pool_stats(self, context, pool_id=None, stats=None, host=None):
# TODO (markmcclain): add stats collection
pass
class LoadBalancerAgentApi(proxy.RpcProxy):
"""Plugin side of plugin to agent RPC API."""
API_VERSION = '1.0'
def __init__(self, topic, host):
super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION)
self.host = host
def reload_pool(self, context, pool_id):
return self.cast(
context,
self.make_msg('reload_pool', pool_id=pool_id, host=self.host),
topic=self.topic
)
def destroy_pool(self, context, pool_id):
return self.cast(
context,
self.make_msg('destroy_pool', pool_id=pool_id, host=self.host),
topic=self.topic
)
def modify_pool(self, context, pool_id):
return self.cast(
context,
self.make_msg('modify_pool', pool_id=pool_id, host=self.host),
topic=self.topic
)
class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
"""
Implementation of the Quantum Loadbalancer Service Plugin.
This class manages the workflow of LBaaS request/response.
Most DB related works are implemented in class
loadbalancer_db.LoadBalancerPluginDb.
"""
supported_extension_aliases = ["lbaas"]
def __init__(self):
"""
Do the initialization for the loadbalancer service plugin here.
"""
qdbapi.register_models()
self.callbacks = LoadBalancerCallbacks(self)
self.conn = rpc.create_connection(new=True)
self.conn.create_consumer(
topics.LOADBALANCER_PLUGIN,
self.callbacks.create_rpc_dispatcher(),
fanout=False)
self.conn.consume_in_thread()
self.agent_rpc = LoadBalancerAgentApi(
topics.LOADBALANCER_AGENT,
cfg.CONF.host
)
def get_plugin_type(self):
return constants.LOADBALANCER
def get_plugin_description(self):
return "Quantum LoadBalancer Service Plugin"
def create_vip(self, context, vip):
vip['vip']['status'] = constants.PENDING_CREATE
v = super(LoadBalancerPlugin, self).create_vip(context, vip)
self.agent_rpc.reload_pool(context, v['pool_id'])
return v
def update_vip(self, context, id, vip):
if 'status' not in vip['vip']:
vip['vip']['status'] = constants.PENDING_UPDATE
v = super(LoadBalancerPlugin, self).update_vip(context, id, vip)
if v['status'] in ACTIVE_PENDING:
self.agent_rpc.reload_pool(context, v['pool_id'])
else:
self.agent_rpc.destroy_pool(context, v['pool_id'])
return v
def delete_vip(self, context, id):
vip = self.get_vip(context, id)
super(LoadBalancerPlugin, self).delete_vip(context, id)
self.agent_rpc.destroy_pool(context, vip['pool_id'])
pass
def create_pool(self, context, pool):
p = super(LoadBalancerPlugin, self).create_pool(context, pool)
# don't notify here because a pool needs a vip to be useful
return p
def update_pool(self, context, id, pool):
if 'status' not in pool['pool']:
pool['pool']['status'] = constants.PENDING_UPDATE
p = super(LoadBalancerPlugin, self).update_pool(context, id, pool)
if p['status'] in ACTIVE_PENDING:
self.agent_rpc.reload_pool(context, p['id'])
else:
self.agent_rpc.destroy_pool(context, p['id'])
return p
def delete_pool(self, context, id):
super(LoadBalancerPlugin, self).delete_pool(context, id)
self.agent_rpc.destroy_pool(context, id)
def create_member(self, context, member):
m = super(LoadBalancerPlugin, self).create_member(context, member)
self.agent_rpc.modify_pool(context, m['pool_id'])
return m
def update_member(self, context, id, member):
if 'status' not in member['member']:
member['member']['status'] = constants.PENDING_UPDATE
m = super(LoadBalancerPlugin, self).update_member(context, id, member)
self.agent_rpc.modify_pool(context, m['pool_id'])
return m
def delete_member(self, context, id):
m = self.get_member(context, id)
super(LoadBalancerPlugin, self).delete_member(context, id)
self.agent_rpc.modify_pool(context, m['pool_id'])
def update_health_monitor(self, context, id, health_monitor):
if 'status' not in health_monitor['health_monitor']:
health_monitor['health_monitor']['status'] = (
constants.PENDING_UPDATE
)
hm = super(LoadBalancerPlugin, self).update_health_monitor(
context,
id,
health_monitor
)
with context.session.begin(subtransactions=True):
qry = context.session.query(
loadbalancer_db.PoolMonitorAssociation
)
qry = qry.filter_by(monitor_id=hm['id'])
for assoc in qry.all():
self.agent_rpc.modify_pool(context, assoc['pool_id'])
return hm
def delete_health_monitor(self, context, id):
with context.session.begin(subtransactions=True):
qry = context.session.query(
loadbalancer_db.PoolMonitorAssociation
)
qry = qry.filter_by(monitor_id=id)
pool_ids = [a['pool_id'] for a in qry.all()]
super(LoadBalancerPlugin, self).delete_health_monitor(context, id)
for pid in pool_ids:
self.agent_rpc.modify_pool(context, pid)
def create_pool_health_monitor(self, context, health_monitor, pool_id):
retval = super(LoadBalancerPlugin, self).create_pool_health_monitor(
context,
health_monitor,
pool_id
)
self.agent_rpc.modify_pool(context, pool_id)
return retval

View File

@ -1,252 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from quantum.db import api as qdbapi
from quantum.db import model_base
from quantum.db.loadbalancer import loadbalancer_db
from quantum.extensions import loadbalancer
from quantum.openstack.common import log as logging
from quantum.plugins.common import constants
LOG = logging.getLogger(__name__)
class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
"""
Implementation of the Quantum Loadbalancer Service Plugin.
This class manages the workflow of LBaaS request/response.
Most DB related works are implemented in class
loadbalancer_db.LoadBalancerPluginDb.
"""
supported_extension_aliases = ["lbaas"]
def __init__(self):
"""
Do the initialization for the loadbalancer service plugin here.
"""
qdbapi.register_models(base=model_base.BASEV2)
# TODO: we probably need to setup RPC channel (to talk to LbAgent) here
def get_plugin_type(self):
return constants.LOADBALANCER
def get_plugin_description(self):
return "Quantum LoadBalancer Service Plugin"
def create_vip(self, context, vip):
v = super(LoadBalancerPlugin, self).create_vip(context, vip)
self.update_status(context, loadbalancer_db.Vip, v['id'],
constants.PENDING_CREATE)
LOG.debug(_("Create vip: %s"), v['id'])
# If we adopt asynchronous mode, this method should return immediately
# and let client to query the object status. The plugin will listen on
# the event from device and update the object status by calling
# self.update_state(context, Vip, id, ACTIVE/ERROR)
#
# In synchronous mode, send the request to device here and wait for
# response. Eventually update the object status prior to the return.
v_query = self.get_vip(context, v['id'])
return v_query
def update_vip(self, context, id, vip):
v_query = self.get_vip(
context, id, fields=["status"])
if v_query['status'] in [
constants.PENDING_DELETE, constants.ERROR]:
raise loadbalancer.StateInvalid(id=id,
state=v_query['status'])
v = super(LoadBalancerPlugin, self).update_vip(context, id, vip)
self.update_status(context, loadbalancer_db.Vip, id,
constants.PENDING_UPDATE)
LOG.debug(_("Update vip: %s"), id)
# TODO notify lbagent
v_rt = self.get_vip(context, id)
return v_rt
def delete_vip(self, context, id):
self.update_status(context, loadbalancer_db.Vip, id,
constants.PENDING_DELETE)
LOG.debug(_("Delete vip: %s"), id)
# TODO notify lbagent
super(LoadBalancerPlugin, self).delete_vip(context, id)
def get_vip(self, context, id, fields=None):
res = super(LoadBalancerPlugin, self).get_vip(context, id, fields)
LOG.debug(_("Get vip: %s"), id)
return res
def get_vips(self, context, filters=None, fields=None):
res = super(LoadBalancerPlugin, self).get_vips(
context, filters, fields)
LOG.debug(_("Get vips"))
return res
def create_pool(self, context, pool):
p = super(LoadBalancerPlugin, self).create_pool(context, pool)
self.update_status(context, loadbalancer_db.Pool, p['id'],
constants.PENDING_CREATE)
LOG.debug(_("Create pool: %s"), p['id'])
# TODO notify lbagent
p_rt = self.get_pool(context, p['id'])
return p_rt
def update_pool(self, context, id, pool):
p_query = self.get_pool(context, id, fields=["status"])
if p_query['status'] in [
constants.PENDING_DELETE, constants.ERROR]:
raise loadbalancer.StateInvalid(id=id,
state=p_query['status'])
p = super(LoadBalancerPlugin, self).update_pool(context, id, pool)
LOG.debug(_("Update pool: %s"), p['id'])
# TODO notify lbagent
p_rt = self.get_pool(context, id)
return p_rt
def delete_pool(self, context, id):
self.update_status(context, loadbalancer_db.Pool, id,
constants.PENDING_DELETE)
# TODO notify lbagent
super(LoadBalancerPlugin, self).delete_pool(context, id)
LOG.debug(_("Delete pool: %s"), id)
def get_pool(self, context, id, fields=None):
res = super(LoadBalancerPlugin, self).get_pool(context, id, fields)
LOG.debug(_("Get pool: %s"), id)
return res
def get_pools(self, context, filters=None, fields=None):
res = super(LoadBalancerPlugin, self).get_pools(
context, filters, fields)
LOG.debug(_("Get Pools"))
return res
def stats(self, context, pool_id):
res = super(LoadBalancerPlugin, self).get_stats(context, pool_id)
LOG.debug(_("Get stats of Pool: %s"), pool_id)
return res
def create_pool_health_monitor(self, context, health_monitor, pool_id):
m = super(LoadBalancerPlugin, self).create_pool_health_monitor(
context, health_monitor, pool_id)
LOG.debug(_("Create health_monitor of pool: %s"), pool_id)
return m
def get_pool_health_monitor(self, context, id, pool_id, fields=None):
m = super(LoadBalancerPlugin, self).get_pool_health_monitor(
context, id, pool_id, fields)
LOG.debug(_("Get health_monitor of pool: %s"), pool_id)
return m
def delete_pool_health_monitor(self, context, id, pool_id):
super(LoadBalancerPlugin, self).delete_pool_health_monitor(
context, id, pool_id)
LOG.debug(_("Delete health_monitor %(id)s of pool: %(pool_id)s"),
{"id": id, "pool_id": pool_id})
def get_member(self, context, id, fields=None):
res = super(LoadBalancerPlugin, self).get_member(
context, id, fields)
LOG.debug(_("Get member: %s"), id)
return res
def get_members(self, context, filters=None, fields=None):
res = super(LoadBalancerPlugin, self).get_members(
context, filters, fields)
LOG.debug(_("Get members"))
return res
def create_member(self, context, member):
m = super(LoadBalancerPlugin, self).create_member(context, member)
self.update_status(context, loadbalancer_db.Member, m['id'],
constants.PENDING_CREATE)
LOG.debug(_("Create member: %s"), m['id'])
# TODO notify lbagent
m_rt = self.get_member(context, m['id'])
return m_rt
def update_member(self, context, id, member):
m_query = self.get_member(context, id, fields=["status"])
if m_query['status'] in [
constants.PENDING_DELETE, constants.ERROR]:
raise loadbalancer.StateInvalid(id=id,
state=m_query['status'])
m = super(LoadBalancerPlugin, self).update_member(context, id, member)
self.update_status(context, loadbalancer_db.Member, id,
constants.PENDING_UPDATE)
LOG.debug(_("Update member: %s"), m['id'])
# TODO notify lbagent
m_rt = self.get_member(context, id)
return m_rt
def delete_member(self, context, id):
self.update_status(context, loadbalancer_db.Member, id,
constants.PENDING_DELETE)
LOG.debug(_("Delete member: %s"), id)
# TODO notify lbagent
super(LoadBalancerPlugin, self).delete_member(context, id)
def get_health_monitor(self, context, id, fields=None):
res = super(LoadBalancerPlugin, self).get_health_monitor(
context, id, fields)
LOG.debug(_("Get health_monitor: %s"), id)
return res
def get_health_monitors(self, context, filters=None, fields=None):
res = super(LoadBalancerPlugin, self).get_health_monitors(
context, filters, fields)
LOG.debug(_("Get health_monitors"))
return res
def create_health_monitor(self, context, health_monitor):
h = super(LoadBalancerPlugin, self).create_health_monitor(
context, health_monitor)
self.update_status(context, loadbalancer_db.HealthMonitor, h['id'],
constants.PENDING_CREATE)
LOG.debug(_("Create health_monitor: %s"), h['id'])
# TODO notify lbagent
h_rt = self.get_health_monitor(context, h['id'])
return h_rt
def update_health_monitor(self, context, id, health_monitor):
h_query = self.get_health_monitor(context, id, fields=["status"])
if h_query['status'] in [
constants.PENDING_DELETE, constants.ERROR]:
raise loadbalancer.StateInvalid(id=id,
state=h_query['status'])
h = super(LoadBalancerPlugin, self).update_health_monitor(
context, id, health_monitor)
self.update_status(context, loadbalancer_db.HealthMonitor, id,
constants.PENDING_UPDATE)
LOG.debug(_("Update health_monitor: %s"), h['id'])
# TODO notify lbagent
h_rt = self.get_health_monitor(context, id)
return h_rt
def delete_health_monitor(self, context, id):
self.update_status(context, loadbalancer_db.HealthMonitor, id,
constants.PENDING_DELETE)
LOG.debug(_("Delete health_monitor: %s"), id)
super(LoadBalancerPlugin, self).delete_health_monitor(context, id)

View File

@ -34,7 +34,9 @@ import quantum.extensions
from quantum.extensions import loadbalancer
from quantum.manager import QuantumManager
from quantum.plugins.common import constants
from quantum.plugins.services.loadbalancer import loadbalancerPlugin
from quantum.plugins.services.agent_loadbalancer import (
plugin as loadbalancer_plugin
)
from quantum.tests.unit import test_db_plugin
from quantum.tests.unit import test_extensions
from quantum.tests.unit import testlib_api
@ -46,8 +48,7 @@ LOG = logging.getLogger(__name__)
DB_CORE_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2'
DB_LB_PLUGIN_KLASS = (
"quantum.plugins.services.loadbalancer."
"loadbalancerPlugin.LoadBalancerPlugin"
"quantum.plugins.services.agent_loadbalancer.plugin.LoadBalancerPlugin"
)
ROOTDIR = os.path.dirname(__file__) + '../../../..'
ETCDIR = os.path.join(ROOTDIR, 'etc')
@ -74,7 +75,7 @@ class LoadBalancerPluginDbTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
self._subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14"
plugin = loadbalancerPlugin.LoadBalancerPlugin()
plugin = loadbalancer_plugin.LoadBalancerPlugin()
ext_mgr = PluginAwareExtensionManager(
extensions_path,
{constants.LOADBALANCER: plugin}

View File

@ -0,0 +1,17 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost

View File

@ -0,0 +1,17 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost

View File

@ -0,0 +1,17 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost

View File

@ -0,0 +1,135 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost
import mock
import testtools
from quantum.plugins.services.agent_loadbalancer.agent import api
class TestApiCache(testtools.TestCase):
def setUp(self):
super(TestApiCache, self).setUp()
self.addCleanup(mock.patch.stopall)
self.api = api.LbaasAgentApi('topic', mock.sentinel.context, 'host')
self.make_msg = mock.patch.object(self.api, 'make_msg').start()
self.mock_call = mock.patch.object(self.api, 'call').start()
def test_init(self):
self.assertEqual(self.api.host, 'host')
self.assertEqual(self.api.context, mock.sentinel.context)
def test_get_ready_devices(self):
self.assertEqual(
self.api.get_ready_devices(),
self.mock_call.return_value
)
self.make_msg.assert_called_once_with('get_ready_devices', host='host')
self.mock_call.assert_called_once_with(
mock.sentinel.context,
self.make_msg.return_value,
topic='topic'
)
def test_get_logical_device(self):
self.assertEqual(
self.api.get_logical_device('pool_id'),
self.mock_call.return_value
)
self.make_msg.assert_called_once_with(
'get_logical_device',
pool_id='pool_id',
host='host')
self.mock_call.assert_called_once_with(
mock.sentinel.context,
self.make_msg.return_value,
topic='topic'
)
def test_pool_destroyed(self):
self.assertEqual(
self.api.pool_destroyed('pool_id'),
self.mock_call.return_value
)
self.make_msg.assert_called_once_with(
'pool_destroyed',
pool_id='pool_id',
host='host')
self.mock_call.assert_called_once_with(
mock.sentinel.context,
self.make_msg.return_value,
topic='topic'
)
def test_plug_vip_port(self):
self.assertEqual(
self.api.plug_vip_port('port_id'),
self.mock_call.return_value
)
self.make_msg.assert_called_once_with(
'plug_vip_port',
port_id='port_id',
host='host')
self.mock_call.assert_called_once_with(
mock.sentinel.context,
self.make_msg.return_value,
topic='topic'
)
def test_unplug_vip_port(self):
self.assertEqual(
self.api.unplug_vip_port('port_id'),
self.mock_call.return_value
)
self.make_msg.assert_called_once_with(
'unplug_vip_port',
port_id='port_id',
host='host')
self.mock_call.assert_called_once_with(
mock.sentinel.context,
self.make_msg.return_value,
topic='topic'
)
def test_update_pool_stats(self):
self.assertEqual(
self.api.update_pool_stats('pool_id', {'stat': 'stat'}),
self.mock_call.return_value
)
self.make_msg.assert_called_once_with(
'update_pool_stats',
pool_id='pool_id',
stats={'stat': 'stat'},
host='host')
self.mock_call.assert_called_once_with(
mock.sentinel.context,
self.make_msg.return_value,
topic='topic'
)

View File

@ -0,0 +1,55 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost
import contextlib
import mock
from oslo.config import cfg
import testtools
from quantum.plugins.services.agent_loadbalancer import agent
class TestLbaasService(testtools.TestCase):
def setUp(self):
super(TestLbaasService, self).setUp()
self.addCleanup(cfg.CONF.reset)
cfg.CONF.register_opts(agent.OPTS)
def test_start(self):
with mock.patch.object(
agent.rpc_service.Service, 'start'
) as mock_start:
mgr = mock.Mock()
agent_service = agent.LbaasAgentService('host', 'topic', mgr)
agent_service.start()
self.assertTrue(mock_start.called)
def test_main(self):
with contextlib.nested(
mock.patch.object(agent.service, 'launch'),
mock.patch.object(agent, 'eventlet'),
mock.patch('sys.argv'),
mock.patch.object(agent.manager, 'LbaasAgentManager')
) as (mock_launch, mock_eventlet, sys_argv, mgr_cls):
agent.main()
self.assertTrue(mock_eventlet.monkey_patch.called)
mock_launch.assert_called_once_with(mock.ANY)

View File

@ -0,0 +1,365 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost
import contextlib
import mock
import testtools
from quantum.plugins.services.agent_loadbalancer.agent import manager
class TestLogicalDeviceCache(testtools.TestCase):
def setUp(self):
super(TestLogicalDeviceCache, self).setUp()
self.cache = manager.LogicalDeviceCache()
def test_put(self):
fake_device = {
'vip': {'port_id': 'port_id'},
'pool': {'id': 'pool_id'}
}
self.cache.put(fake_device)
self.assertEqual(len(self.cache.devices), 1)
self.assertEqual(len(self.cache.port_lookup), 1)
self.assertEqual(len(self.cache.pool_lookup), 1)
def test_double_put(self):
fake_device = {
'vip': {'port_id': 'port_id'},
'pool': {'id': 'pool_id'}
}
self.cache.put(fake_device)
self.cache.put(fake_device)
self.assertEqual(len(self.cache.devices), 1)
self.assertEqual(len(self.cache.port_lookup), 1)
self.assertEqual(len(self.cache.pool_lookup), 1)
def test_remove_in_cache(self):
fake_device = {
'vip': {'port_id': 'port_id'},
'pool': {'id': 'pool_id'}
}
self.cache.put(fake_device)
self.assertEqual(len(self.cache.devices), 1)
self.cache.remove(fake_device)
self.assertFalse(len(self.cache.devices))
self.assertFalse(self.cache.port_lookup)
self.assertFalse(self.cache.pool_lookup)
def test_remove_in_cache_same_object(self):
fake_device = {
'vip': {'port_id': 'port_id'},
'pool': {'id': 'pool_id'}
}
self.cache.put(fake_device)
self.assertEqual(len(self.cache.devices), 1)
self.cache.remove(set(self.cache.devices).pop())
self.assertFalse(len(self.cache.devices))
self.assertFalse(self.cache.port_lookup)
self.assertFalse(self.cache.pool_lookup)
def test_remove_by_pool_id(self):
fake_device = {
'vip': {'port_id': 'port_id'},
'pool': {'id': 'pool_id'}
}
self.cache.put(fake_device)
self.assertEqual(len(self.cache.devices), 1)
self.cache.remove_by_pool_id('pool_id')
self.assertFalse(len(self.cache.devices))
self.assertFalse(self.cache.port_lookup)
self.assertFalse(self.cache.pool_lookup)
def test_get_by_pool_id(self):
fake_device = {
'vip': {'port_id': 'port_id'},
'pool': {'id': 'pool_id'}
}
self.cache.put(fake_device)
dev = self.cache.get_by_pool_id('pool_id')
self.assertEqual(dev.pool_id, 'pool_id')
self.assertEqual(dev.port_id, 'port_id')
def test_get_by_port_id(self):
fake_device = {
'vip': {'port_id': 'port_id'},
'pool': {'id': 'pool_id'}
}
self.cache.put(fake_device)
dev = self.cache.get_by_port_id('port_id')
self.assertEqual(dev.pool_id, 'pool_id')
self.assertEqual(dev.port_id, 'port_id')
def test_get_pool_ids(self):
fake_device = {
'vip': {'port_id': 'port_id'},
'pool': {'id': 'pool_id'}
}
self.cache.put(fake_device)
self.assertEqual(self.cache.get_pool_ids(), ['pool_id'])
class TestManager(testtools.TestCase):
def setUp(self):
super(TestManager, self).setUp()
self.addCleanup(mock.patch.stopall)
mock_conf = mock.Mock()
mock_conf.interface_driver = 'intdriver'
mock_conf.device_driver = 'devdriver'
mock_conf.AGENT.root_helper = 'sudo'
mock_conf.loadbalancer_state_path = '/the/path'
self.mock_importer = mock.patch.object(manager, 'importutils').start()
rpc_mock_cls = mock.patch(
'quantum.plugins.services.agent_loadbalancer.agent.api'
'.LbaasAgentApi'
).start()
self.mgr = manager.LbaasAgentManager(mock_conf)
self.rpc_mock = rpc_mock_cls.return_value
self.log = mock.patch.object(manager, 'LOG').start()
self.mgr.needs_resync = False
def test_initialize_service_hook(self):
with mock.patch.object(self.mgr, 'sync_state') as sync:
self.mgr.initialize_service_hook(mock.Mock())
sync.assert_called_once_with()
def test_periodic_resync_needs_sync(self):
with mock.patch.object(self.mgr, 'sync_state') as sync:
self.mgr.needs_resync = True
self.mgr.periodic_resync(mock.Mock())
sync.assert_called_once_with()
def test_periodic_resync_no_sync(self):
with mock.patch.object(self.mgr, 'sync_state') as sync:
self.mgr.needs_resync = False
self.mgr.periodic_resync(mock.Mock())
self.assertFalse(sync.called)
def test_collect_stats(self):
with mock.patch.object(self.mgr, 'cache') as cache:
cache.get_pool_ids.return_value = ['1', '2']
self.mgr.collect_stats(mock.Mock())
self.rpc_mock.update_pool_stats.assert_has_calls([
mock.call('1', mock.ANY),
mock.call('2', mock.ANY)
])
def test_collect_stats_exception(self):
with mock.patch.object(self.mgr, 'cache') as cache:
cache.get_pool_ids.return_value = ['1', '2']
with mock.patch.object(self.mgr, 'driver') as driver:
driver.get_stats.side_effect = Exception
self.mgr.collect_stats(mock.Mock())
self.assertFalse(self.rpc_mock.called)
self.assertTrue(self.mgr.needs_resync)
self.assertTrue(self.log.exception.called)
def test_vip_plug_callback(self):
self.mgr._vip_plug_callback('plug', {'id': 'id'})
self.rpc_mock.plug_vip_port.assert_called_once_with('id')
def test_vip_unplug_callback(self):
self.mgr._vip_plug_callback('unplug', {'id': 'id'})
self.rpc_mock.unplug_vip_port.assert_called_once_with('id')
def _sync_state_helper(self, cache, ready, refreshed, destroyed):
with contextlib.nested(
mock.patch.object(self.mgr, 'cache'),
mock.patch.object(self.mgr, 'refresh_device'),
mock.patch.object(self.mgr, 'destroy_device')
) as (mock_cache, refresh, destroy):
mock_cache.get_pool_ids.return_value = cache
self.rpc_mock.get_ready_devices.return_value = ready
self.mgr.sync_state()
self.assertEqual(len(refreshed), len(refresh.mock_calls))
self.assertEqual(len(destroyed), len(destroy.mock_calls))
refresh.assert_has_calls([mock.call(i) for i in refreshed])
destroy.assert_has_calls([mock.call(i) for i in destroyed])
self.assertFalse(self.mgr.needs_resync)
def test_sync_state_all_known(self):
self._sync_state_helper(['1', '2'], ['1', '2'], ['1', '2'], [])
def test_sync_state_all_unknown(self):
self._sync_state_helper([], ['1', '2'], ['1', '2'], [])
def test_sync_state_destroy_all(self):
self._sync_state_helper(['1', '2'], [], [], ['1', '2'])
def test_sync_state_both(self):
self._sync_state_helper(['1'], ['2'], ['2'], ['1'])
def test_sync_state_exception(self):
self.rpc_mock.get_ready_devices.side_effect = Exception
self.mgr.sync_state()
self.assertTrue(self.log.exception.called)
self.assertTrue(self.mgr.needs_resync)
def test_refresh_device_exists(self):
config = self.rpc_mock.get_logical_device.return_value
with mock.patch.object(self.mgr, 'driver') as driver:
with mock.patch.object(self.mgr, 'cache') as cache:
driver.exists.return_value = True
self.mgr.refresh_device(config)
driver.exists.assert_called_once_with(config)
driver.update.assert_called_once_with(config)
cache.put.assert_called_once_with(config)
self.assertFalse(self.mgr.needs_resync)
def test_refresh_device_new(self):
config = self.rpc_mock.get_logical_device.return_value
with mock.patch.object(self.mgr, 'driver') as driver:
with mock.patch.object(self.mgr, 'cache') as cache:
driver.exists.return_value = False
self.mgr.refresh_device(config)
driver.exists.assert_called_once_with(config)
driver.create.assert_called_once_with(config)
cache.put.assert_called_once_with(config)
self.assertFalse(self.mgr.needs_resync)
def test_refresh_device_exception(self):
config = self.rpc_mock.get_logical_device.return_value
with mock.patch.object(self.mgr, 'driver') as driver:
with mock.patch.object(self.mgr, 'cache') as cache:
driver.exists.side_effect = Exception
self.mgr.refresh_device(config)
driver.exists.assert_called_once_with(config)
self.assertTrue(self.mgr.needs_resync)
self.assertTrue(self.log.exception.called)
self.assertFalse(cache.put.called)
def test_destroy_device_known(self):
with mock.patch.object(self.mgr, 'driver') as driver:
with mock.patch.object(self.mgr, 'cache') as cache:
cache.get_by_pool_id.return_value = True
self.mgr.destroy_device('pool_id')
cache.get_by_pool_id.assert_called_once_with('pool_id')
driver.destroy.assert_called_once_with('pool_id')
self.rpc_mock.pool_destroyed.assert_called_once_with(
'pool_id'
)
cache.remove.assert_called_once_with(True)
self.assertFalse(self.mgr.needs_resync)
def test_destroy_device_unknown(self):
with mock.patch.object(self.mgr, 'driver') as driver:
with mock.patch.object(self.mgr, 'cache') as cache:
cache.get_by_pool_id.return_value = None
self.mgr.destroy_device('pool_id')
cache.get_by_pool_id.assert_called_once_with('pool_id')
self.assertFalse(driver.destroy.called)
def test_destroy_device_exception(self):
with mock.patch.object(self.mgr, 'driver') as driver:
with mock.patch.object(self.mgr, 'cache') as cache:
cache.get_by_pool_id.return_value = True
driver.destroy.side_effect = Exception
self.mgr.destroy_device('pool_id')
cache.get_by_pool_id.assert_called_once_with('pool_id')
self.assertTrue(self.log.exception.called)
self.assertTrue(self.mgr.needs_resync)
def test_remove_orphans(self):
with mock.patch.object(self.mgr, 'driver') as driver:
with mock.patch.object(self.mgr, 'cache') as cache:
cache.get_pool_ids.return_value = ['1', '2']
self.mgr.remove_orphans()
driver.remove_orphans.assert_called_once_with(['1', '2'])
def test_reload_pool(self):
with mock.patch.object(self.mgr, 'refresh_device') as refresh:
self.mgr.reload_pool(mock.Mock(), pool_id='pool_id')
refresh.assert_called_once_with('pool_id')
def test_modify_pool_known(self):
with mock.patch.object(self.mgr, 'refresh_device') as refresh:
with mock.patch.object(self.mgr, 'cache') as cache:
cache.get_by_pool_id.return_value = True
self.mgr.reload_pool(mock.Mock(), pool_id='pool_id')
refresh.assert_called_once_with('pool_id')
def test_modify_pool_unknown(self):
with mock.patch.object(self.mgr, 'refresh_device') as refresh:
with mock.patch.object(self.mgr, 'cache') as cache:
cache.get_by_pool_id.return_value = False
self.mgr.modify_pool(mock.Mock(), pool_id='pool_id')
self.assertFalse(refresh.called)
def test_destroy_pool_known(self):
with mock.patch.object(self.mgr, 'destroy_device') as destroy:
with mock.patch.object(self.mgr, 'cache') as cache:
cache.get_by_pool_id.return_value = True
self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id')
destroy.assert_called_once_with('pool_id')
def test_destroy_pool_unknown(self):
with mock.patch.object(self.mgr, 'destroy_device') as destroy:
with mock.patch.object(self.mgr, 'cache') as cache:
cache.get_by_pool_id.return_value = False
self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id')
self.assertFalse(destroy.called)

View File

@ -0,0 +1,17 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost

View File

@ -0,0 +1,17 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost

View File

@ -0,0 +1,131 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost
import contextlib
import mock
import testtools
from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
namespace_driver
)
class TestHaproxyNSDriver(testtools.TestCase):
def setUp(self):
super(TestHaproxyNSDriver, self).setUp()
self.vif_driver = mock.Mock()
self.vip_plug_callback = mock.Mock()
self.driver = namespace_driver.HaproxyNSDriver(
'sudo',
'/the/path',
self.vif_driver,
self.vip_plug_callback
)
self.fake_config = {
'pool': {'id': 'pool_id'},
'vip': {'id': 'vip_id', 'port': {'id': 'port_id'}}
}
def test_create(self):
with mock.patch.object(self.driver, '_plug') as plug:
with mock.patch.object(self.driver, '_spawn') as spawn:
self.driver.create(self.fake_config)
plug.assert_called_once_with(
'qlbaas-pool_id', {'id': 'port_id'}
)
spawn.assert_called_once_with(self.fake_config)
def test_update(self):
with contextlib.nested(
mock.patch.object(self.driver, '_get_state_file_path'),
mock.patch.object(self.driver, '_spawn'),
mock.patch('__builtin__.open')
) as (gsp, spawn, mock_open):
mock_open.return_value = ['5']
self.driver.update(self.fake_config)
mock_open.assert_called_once_with(gsp.return_value, 'r')
spawn.assert_called_once_with(self.fake_config, ['-sf', '5'])
def test_spawn(self):
with contextlib.nested(
mock.patch.object(namespace_driver.hacfg, 'save_config'),
mock.patch.object(self.driver, '_get_state_file_path'),
mock.patch('quantum.agent.linux.ip_lib.IPWrapper')
) as (mock_save, gsp, ip_wrap):
gsp.side_effect = lambda x, y: y
self.driver._spawn(self.fake_config)
mock_save.assert_called_once_with('conf', self.fake_config, 'sock')
cmd = ['haproxy', '-f', 'conf', '-p', 'pid']
ip_wrap.assert_has_calls([
mock.call('sudo', 'qlbaas-pool_id'),
mock.call().netns.execute(cmd)
])
def test_destroy(self):
with contextlib.nested(
mock.patch.object(self.driver, '_get_state_file_path'),
mock.patch.object(namespace_driver, 'kill_pids_in_file'),
mock.patch.object(self.driver, '_unplug'),
mock.patch('quantum.agent.linux.ip_lib.IPWrapper'),
mock.patch('os.path.isdir'),
mock.patch('shutil.rmtree')
) as (gsp, kill, unplug, ip_wrap, isdir, rmtree):
gsp.side_effect = lambda x, y: '/pool/' + y
self.driver.pool_to_port_id['pool_id'] = 'port_id'
isdir.return_value = True
self.driver.destroy('pool_id')
kill.assert_called_once_with(ip_wrap(), '/pool/pid')
unplug.assert_called_once_with('qlbaas-pool_id', 'port_id')
isdir.called_once_with('/pool')
rmtree.assert_called_once_with('/pool')
ip_wrap.assert_has_calls([
mock.call('sudo', 'qlbaas-pool_id'),
mock.call().garbage_collect_namespace()
])
def test_exists(self):
with contextlib.nested(
mock.patch.object(self.driver, '_get_state_file_path'),
mock.patch('quantum.agent.linux.ip_lib.IPWrapper'),
mock.patch('socket.socket'),
mock.patch('os.path.exists'),
) as (gsp, ip_wrap, socket, path_exists):
gsp.side_effect = lambda x, y: '/pool/' + y
ip_wrap.return_value.netns.exists.return_value = True
path_exists.return_value = True
self.driver.exists('pool_id')
ip_wrap.assert_has_calls([
mock.call('sudo'),
mock.call().netns.exists('qlbaas-pool_id')
])
self.assertTrue(self.driver.exists('pool_id'))

View File

@ -0,0 +1,263 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost
import mock
import testtools
from quantum import context
from quantum import manager
from quantum.plugins.common import constants
from quantum.plugins.services.agent_loadbalancer import plugin
from quantum.tests.unit.db.loadbalancer import test_db_loadbalancer
class TestLoadBalancerPluginBase(
test_db_loadbalancer.LoadBalancerPluginDbTestCase):
def setUp(self):
super(TestLoadBalancerPluginBase, self).setUp()
# create another API instance to make testing easier
# pass a mock to our API instance
# we need access to loaded plugins to modify models
loaded_plugins = manager.QuantumManager().get_service_plugins()
self.plugin_instance = loaded_plugins[constants.LOADBALANCER]
self.callbacks = self.plugin_instance.callbacks
class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
def test_get_ready_devices(self):
with self.vip() as vip:
ready = self.callbacks.get_ready_devices(
context.get_admin_context(),
)
self.assertEqual(ready, [vip['vip']['pool_id']])
def test_get_ready_devices_inactive_vip(self):
with self.vip() as vip:
# set the vip inactive need to use plugin directly since
# status is not tenant mutable
self.plugin_instance.update_vip(
context.get_admin_context(),
vip['vip']['id'],
{'vip': {'status': constants.INACTIVE}}
)
ready = self.callbacks.get_ready_devices(
context.get_admin_context(),
)
self.assertFalse(ready)
def test_get_ready_devices_inactive_pool(self):
with self.vip() as vip:
# set the pool inactive need to use plugin directly since
# status is not tenant mutable
self.plugin_instance.update_pool(
context.get_admin_context(),
vip['vip']['pool_id'],
{'pool': {'status': constants.INACTIVE}}
)
ready = self.callbacks.get_ready_devices(
context.get_admin_context(),
)
self.assertFalse(ready)
def test_get_logical_device_inactive(self):
with self.pool() as pool:
with self.vip(pool=pool) as vip:
with self.member(pool_id=vip['vip']['pool_id']) as member:
self.assertRaises(
Exception,
self.callbacks.get_logical_device,
context.get_admin_context(),
pool['pool']['id'],
activate=False
)
def test_get_logical_device_activate(self):
with self.pool() as pool:
with self.vip(pool=pool) as vip:
with self.member(pool_id=vip['vip']['pool_id']) as member:
ctx = context.get_admin_context()
# build the expected
port = self.plugin_instance._core_plugin.get_port(
ctx, vip['vip']['port_id']
)
subnet = self.plugin_instance._core_plugin.get_subnet(
ctx, vip['vip']['subnet_id']
)
port['fixed_ips'][0]['subnet'] = subnet
# reload pool to add members and vip
pool = self.plugin_instance.get_pool(
ctx, pool['pool']['id']
)
pool['status'] = constants.ACTIVE
vip['vip']['status'] = constants.ACTIVE
vip['vip']['port'] = port
member['member']['status'] = constants.ACTIVE
expected = {
'pool': pool,
'vip': vip['vip'],
'members': [member['member']],
'healthmonitors': []
}
logical_config = self.callbacks.get_logical_device(
ctx, pool['id'], activate=True
)
self.assertEqual(logical_config, expected)
def _update_port_test_helper(self, expected, func, **kwargs):
core = self.plugin_instance._core_plugin
with self.pool() as pool:
with self.vip(pool=pool) as vip:
with self.member(pool_id=vip['vip']['pool_id']) as member:
ctx = context.get_admin_context()
func(ctx, port_id=vip['vip']['port_id'], **kwargs)
db_port = core.get_port(ctx, vip['vip']['port_id'])
for k, v in expected.iteritems():
self.assertEqual(db_port[k], v)
def test_plug_vip_port(self):
exp = {
'device_owner': 'quantum:' + constants.LOADBALANCER,
'device_id': 'c596ce11-db30-5c72-8243-15acaae8690f',
'admin_state_up': True
}
self._update_port_test_helper(
exp,
self.callbacks.plug_vip_port,
host='host'
)
def test_unplug_vip_port(self):
exp = {
'device_owner': '',
'device_id': '',
'admin_state_up': False
}
self._update_port_test_helper(
exp,
self.callbacks.unplug_vip_port,
host='host'
)
class TestLoadBalancerAgentApi(testtools.TestCase):
def setUp(self):
super(TestLoadBalancerAgentApi, self).setUp()
self.addCleanup(mock.patch.stopall)
self.api = plugin.LoadBalancerAgentApi('topic', 'host')
self.mock_cast = mock.patch.object(self.api, 'cast').start()
self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
def test_init(self):
self.assertEqual(self.api.topic, 'topic')
self.assertEqual(self.api.host, 'host')
def _call_test_helper(self, method_name):
rv = getattr(self.api, method_name)(mock.sentinel.context, 'the_id')
self.assertEqual(rv, self.mock_cast.return_value)
self.mock_cast.assert_called_once_with(
mock.sentinel.context,
self.mock_msg.return_value,
topic='topic'
)
self.mock_msg.assert_called_once_with(
method_name,
pool_id='the_id',
host='host'
)
def test_reload_pool(self):
self._call_test_helper('reload_pool')
def test_destroy_pool(self):
self._call_test_helper('destroy_pool')
def test_modify_pool(self):
self._call_test_helper('modify_pool')
class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
def setUp(self):
self.log = mock.patch.object(plugin, 'LOG')
api_cls = mock.patch.object(plugin, 'LoadBalancerAgentApi').start()
super(TestLoadBalancerPluginNotificationWrapper, self).setUp()
self.mock_api = api_cls.return_value
self.addCleanup(mock.patch.stopall)
def test_create_vip(self):
with self.subnet() as subnet:
with self.pool(subnet=subnet) as pool:
with self.vip(pool=pool, subnet=subnet) as vip:
self.mock_api.reload_pool.assert_called_once_with(
mock.ANY,
vip['vip']['pool_id']
)
def test_update_vip(self):
with self.subnet() as subnet:
with self.pool(subnet=subnet) as pool:
with self.vip(pool=pool, subnet=subnet) as vip:
self.mock_api.reset_mock()
ctx = context.get_admin_context()
vip['vip'].pop('status')
new_vip = self.plugin_instance.update_vip(
ctx,
vip['vip']['id'],
vip
)
self.mock_api.reload_pool.assert_called_once_with(
mock.ANY,
vip['vip']['pool_id']
)
self.assertEqual(
new_vip['status'],
constants.PENDING_UPDATE
)
def t2est_delete_vip(self):
with self.subnet() as subnet:
with self.pool(subnet=subnet) as pool:
with self.vip(pool=pool, subnet=subnet, no_delete=True) as vip:
self.mock_api.reset_mock()
ctx = context.get_admin_context()
self.plugin_instance.delete_vip(context, vip['vip']['id'])
self.mock_api.destroy_pool.assert_called_once_with(
mock.ANY,
vip['vip']['pool_id']
)

View File

@ -70,3 +70,21 @@ class AgentUtilsGetInterfaceMAC(testtools.TestCase):
'\x00' * 232])
actual_val = utils.get_interface_mac('eth0')
self.assertEqual(actual_val, expect_val)
class AgentUtilsReplaceFile(testtools.TestCase):
def test_replace_file(self):
# make file to replace
with mock.patch('tempfile.NamedTemporaryFile') as ntf:
ntf.return_value.name = '/baz'
with mock.patch('os.chmod') as chmod:
with mock.patch('os.rename') as rename:
utils.replace_file('/foo', 'bar')
expected = [mock.call('w+', dir='/', delete=False),
mock.call().write('bar'),
mock.call().close()]
ntf.assert_has_calls(expected)
chmod.assert_called_once_with('/baz', 0644)
rename.assert_called_once_with('/baz', '/foo')

View File

@ -138,22 +138,6 @@ class TestDhcpBase(testtools.TestCase):
def test_base_abc_error(self):
self.assertRaises(TypeError, dhcp.DhcpBase, None)
def test_replace_file(self):
# make file to replace
with mock.patch('tempfile.NamedTemporaryFile') as ntf:
ntf.return_value.name = '/baz'
with mock.patch('os.chmod') as chmod:
with mock.patch('os.rename') as rename:
dhcp.replace_file('/foo', 'bar')
expected = [mock.call('w+', dir='/', delete=False),
mock.call().write('bar'),
mock.call().close()]
ntf.assert_has_calls(expected)
chmod.assert_called_once_with('/baz', 0644)
rename.assert_called_once_with('/baz', '/foo')
def test_restart(self):
class SubClass(dhcp.DhcpBase):
def __init__(self):
@ -212,7 +196,7 @@ class TestBase(testtools.TestCase):
self.conf.set_override('state_path', '')
self.conf.use_namespaces = True
self.replace_p = mock.patch('quantum.agent.linux.dhcp.replace_file')
self.replace_p = mock.patch('quantum.agent.linux.utils.replace_file')
self.execute_p = mock.patch('quantum.agent.linux.utils.execute')
self.addCleanup(self.execute_p.stop)
self.safe = self.replace_p.start()
@ -392,7 +376,7 @@ class TestDhcpLocalProcess(TestBase):
self.assertEqual(lp.interface_name, 'tap0')
def test_set_interface_name(self):
with mock.patch('quantum.agent.linux.dhcp.replace_file') as replace:
with mock.patch('quantum.agent.linux.utils.replace_file') as replace:
lp = LocalChild(self.conf, FakeDualNetwork())
with mock.patch.object(lp, 'get_conf_file_name') as conf_file:
conf_file.return_value = '/interface'

View File

@ -137,6 +137,8 @@ else:
'quantum-debug = quantum.debug.shell:main',
'quantum-ovs-cleanup = quantum.agent.ovs_cleanup_util:main',
'quantum-db-manage = quantum.db.migration.cli:main',
('quantum-lbaas-agent = '
'quantum.plugins.services.agent_loadbalancer.agent:main'),
('quantum-check-nvp-config = '
'quantum.plugins.nicira.nicira_nvp_plugin.check_nvp_config:main'),
]