From 0ade06453cc26f240281fcc04eff26fc9eeb541b Mon Sep 17 00:00:00 2001 From: Mark McClain Date: Sun, 24 Feb 2013 07:55:06 -0500 Subject: [PATCH] LBaaS Agent Reference Implementation implements blueprint lbaas-namespace-agent This a reference implemention of the Quantum load balancing service using HAProxy. The implemention is designed for vendors, developers, and deployers to become familiar with the API and service workflow. This change also adds some constraint checks for data integrity. Change-Id: I10a67da11840477ccf063b98149f4f77248802a1 --- bin/quantum-lbaas-agent | 26 ++ etc/lbaas_agent.ini | 24 ++ etc/quantum/rootwrap.d/lbaas-haproxy.filters | 29 ++ quantum/agent/linux/dhcp.py | 24 +- quantum/agent/linux/utils.py | 18 + quantum/common/topics.py | 2 + quantum/db/loadbalancer/loadbalancer_db.py | 86 +++-- .../versions/54c2c487e913_lbaas.py | 1 + quantum/extensions/loadbalancer.py | 2 + .../__init__.py | 0 .../agent_loadbalancer/agent/__init__.py | 67 ++++ .../services/agent_loadbalancer/agent/api.py | 81 ++++ .../agent_loadbalancer/agent/manager.py | 221 +++++++++++ .../services/agent_loadbalancer/constants.py | 33 ++ .../agent_loadbalancer/drivers/__init__.py | 17 + .../drivers/haproxy/__init__.py | 17 + .../agent_loadbalancer/drivers/haproxy/cfg.py | 184 +++++++++ .../drivers/haproxy/namespace_driver.py | 182 +++++++++ .../services/agent_loadbalancer/plugin.py | 338 ++++++++++++++++ .../loadbalancer/loadbalancerPlugin.py | 252 ------------ .../db/loadbalancer/test_db_loadbalancer.py | 9 +- quantum/tests/unit/services/__init__.py | 17 + .../services/agent_loadbalancer/__init__.py | 17 + .../agent_loadbalancer/agent/__init__.py | 17 + .../agent_loadbalancer/agent/test_api.py | 135 +++++++ .../agent_loadbalancer/agent/test_init.py | 55 +++ .../agent_loadbalancer/agent/test_manager.py | 365 ++++++++++++++++++ .../agent_loadbalancer/driver/__init__.py | 17 + .../driver/haproxy/__init__.py | 17 + .../driver/haproxy/test_namespace_driver.py | 131 +++++++ .../agent_loadbalancer/test_plugin.py | 263 +++++++++++++ quantum/tests/unit/test_agent_linux_utils.py | 18 + quantum/tests/unit/test_linux_dhcp.py | 20 +- setup.py | 2 + 34 files changed, 2359 insertions(+), 328 deletions(-) create mode 100755 bin/quantum-lbaas-agent create mode 100644 etc/lbaas_agent.ini create mode 100644 etc/quantum/rootwrap.d/lbaas-haproxy.filters rename quantum/plugins/services/{loadbalancer => agent_loadbalancer}/__init__.py (100%) create mode 100644 quantum/plugins/services/agent_loadbalancer/agent/__init__.py create mode 100644 quantum/plugins/services/agent_loadbalancer/agent/api.py create mode 100644 quantum/plugins/services/agent_loadbalancer/agent/manager.py create mode 100644 quantum/plugins/services/agent_loadbalancer/constants.py create mode 100644 quantum/plugins/services/agent_loadbalancer/drivers/__init__.py create mode 100644 quantum/plugins/services/agent_loadbalancer/drivers/haproxy/__init__.py create mode 100644 quantum/plugins/services/agent_loadbalancer/drivers/haproxy/cfg.py create mode 100644 quantum/plugins/services/agent_loadbalancer/drivers/haproxy/namespace_driver.py create mode 100644 quantum/plugins/services/agent_loadbalancer/plugin.py delete mode 100644 quantum/plugins/services/loadbalancer/loadbalancerPlugin.py create mode 100644 quantum/tests/unit/services/__init__.py create mode 100644 quantum/tests/unit/services/agent_loadbalancer/__init__.py create mode 100644 quantum/tests/unit/services/agent_loadbalancer/agent/__init__.py create mode 100644 quantum/tests/unit/services/agent_loadbalancer/agent/test_api.py create mode 100644 quantum/tests/unit/services/agent_loadbalancer/agent/test_init.py create mode 100644 quantum/tests/unit/services/agent_loadbalancer/agent/test_manager.py create mode 100644 quantum/tests/unit/services/agent_loadbalancer/driver/__init__.py create mode 100644 quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/__init__.py create mode 100644 quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_namespace_driver.py create mode 100644 quantum/tests/unit/services/agent_loadbalancer/test_plugin.py diff --git a/bin/quantum-lbaas-agent b/bin/quantum-lbaas-agent new file mode 100755 index 0000000000..a53e4574b2 --- /dev/null +++ b/bin/quantum-lbaas-agent @@ -0,0 +1,26 @@ +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 Openstack, LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import sys +sys.path.insert(0, os.getcwd()) + +from quantum.plugins.services.agent_loadbalancer.agent import main + + +main() diff --git a/etc/lbaas_agent.ini b/etc/lbaas_agent.ini new file mode 100644 index 0000000000..e3ea75c3d3 --- /dev/null +++ b/etc/lbaas_agent.ini @@ -0,0 +1,24 @@ +[DEFAULT] +# Show debugging output in log (sets DEBUG log level output) +# debug = true + +# The LBaaS agent will resync its state with Quantum to recover from any +# transient notification or rpc errors. The interval is number of +# seconds between attempts. +# periodic_interval = 10 + +# OVS based plugins(OVS, Ryu, NEC, NVP, BigSwitch/Floodlight) +interface_driver = quantum.agent.linux.interface.OVSInterfaceDriver +# OVS based plugins(Ryu, NEC, NVP, BigSwitch/Floodlight) that use OVS +# as OpenFlow switch and check port status +# ovs_use_veth = True +# LinuxBridge +# interface_driver = quantum.agent.linux.interface.BridgeInterfaceDriver + +# The agent requires a driver to manage the loadbalancer. HAProxy is the +# opensource version. +device_driver = quantum.plugins.services.agent_loadbalancer.drivers.haproxy.namespace_driver.HaproxyNSDriver + +# Allow overlapping IP (Must have kernel build with CONFIG_NET_NS=y and +# iproute2 package that supports namespaces). +# use_namespaces = True diff --git a/etc/quantum/rootwrap.d/lbaas-haproxy.filters b/etc/quantum/rootwrap.d/lbaas-haproxy.filters new file mode 100644 index 0000000000..e00a7197ae --- /dev/null +++ b/etc/quantum/rootwrap.d/lbaas-haproxy.filters @@ -0,0 +1,29 @@ +# quantum-rootwrap command filters for nodes on which quantum is +# expected to control network +# +# This file should be owned by (and only-writeable by) the root user + +# format seems to be +# cmd-name: filter-name, raw-command, user, args + +[Filters] + +# haproxy +haproxy: CommandFilter, /usr/sbin/haproxy, root + +# lbaas-agent uses kill as well, that's handled by the generic KillFilter +kill_haproxy_usr: KillFilter, root, /usr/sbin/haproxy, -9, -HUP + +# lbaas-agent uses cat +cat: RegExpFilter, /bin/cat, root, cat, /proc/\d+/cmdline + +ovs-vsctl: CommandFilter, /bin/ovs-vsctl, root +ovs-vsctl_usr: CommandFilter, /usr/bin/ovs-vsctl, root +ovs-vsctl_sbin: CommandFilter, /sbin/ovs-vsctl, root +ovs-vsctl_sbin_usr: CommandFilter, /usr/sbin/ovs-vsctl, root + +# ip_lib +ip: IpFilter, /sbin/ip, root +ip_usr: IpFilter, /usr/sbin/ip, root +ip_exec: IpNetnsExecFilter, /sbin/ip, root +ip_exec_usr: IpNetnsExecFilter, /usr/sbin/ip, root diff --git a/quantum/agent/linux/dhcp.py b/quantum/agent/linux/dhcp.py index 0c05fcaabb..38e7fd273f 100644 --- a/quantum/agent/linux/dhcp.py +++ b/quantum/agent/linux/dhcp.py @@ -21,7 +21,6 @@ import re import socket import StringIO import sys -import tempfile import netaddr from oslo.config import cfg @@ -187,7 +186,7 @@ class DhcpLocalProcess(DhcpBase): def interface_name(self, value): interface_file_path = self.get_conf_file_name('interface', ensure_conf_dir=True) - replace_file(interface_file_path, value) + utils.replace_file(interface_file_path, value) @abc.abstractmethod def spawn_process(self): @@ -298,7 +297,7 @@ class Dnsmasq(DhcpLocalProcess): (port.mac_address, name, alloc.ip_address)) name = self.get_conf_file_name('host') - replace_file(name, buf.getvalue()) + utils.replace_file(name, buf.getvalue()) return name def _output_opts_file(self): @@ -344,7 +343,7 @@ class Dnsmasq(DhcpLocalProcess): options.append(self._format_option(i, 'router')) name = self.get_conf_file_name('opts') - replace_file(name, '\n'.join(options)) + utils.replace_file(name, '\n'.join(options)) return name def _make_subnet_interface_ip_map(self): @@ -402,20 +401,3 @@ class Dnsmasq(DhcpLocalProcess): sock.connect(dhcp_relay_socket) sock.send(jsonutils.dumps(data)) sock.close() - - -def replace_file(file_name, data): - """Replaces the contents of file_name with data in a safe manner. - - First write to a temp file and then rename. Since POSIX renames are - atomic, the file is unlikely to be corrupted by competing writes. - - We create the tempfile on the same device to ensure that it can be renamed. - """ - - base_dir = os.path.dirname(os.path.abspath(file_name)) - tmp_file = tempfile.NamedTemporaryFile('w+', dir=base_dir, delete=False) - tmp_file.write(data) - tmp_file.close() - os.chmod(tmp_file.name, 0644) - os.rename(tmp_file.name, file_name) diff --git a/quantum/agent/linux/utils.py b/quantum/agent/linux/utils.py index a16d4815e1..cbfefffebe 100644 --- a/quantum/agent/linux/utils.py +++ b/quantum/agent/linux/utils.py @@ -22,6 +22,7 @@ import os import shlex import socket import struct +import tempfile from eventlet.green import subprocess @@ -71,3 +72,20 @@ def get_interface_mac(interface): struct.pack('256s', interface[:DEVICE_NAME_LEN])) return ''.join(['%02x:' % ord(char) for char in info[MAC_START:MAC_END]])[:-1] + + +def replace_file(file_name, data): + """Replaces the contents of file_name with data in a safe manner. + + First write to a temp file and then rename. Since POSIX renames are + atomic, the file is unlikely to be corrupted by competing writes. + + We create the tempfile on the same device to ensure that it can be renamed. + """ + + base_dir = os.path.dirname(os.path.abspath(file_name)) + tmp_file = tempfile.NamedTemporaryFile('w+', dir=base_dir, delete=False) + tmp_file.write(data) + tmp_file.close() + os.chmod(tmp_file.name, 0644) + os.rename(tmp_file.name, file_name) diff --git a/quantum/common/topics.py b/quantum/common/topics.py index 91970f2f98..4a25549586 100644 --- a/quantum/common/topics.py +++ b/quantum/common/topics.py @@ -25,9 +25,11 @@ UPDATE = 'update' AGENT = 'q-agent-notifier' PLUGIN = 'q-plugin' DHCP = 'q-dhcp-notifer' +LOADBALANCER_PLUGIN = 'q-loadbalancer-plugin' L3_AGENT = 'l3_agent' DHCP_AGENT = 'dhcp_agent' +LOADBALANCER_AGENT = 'loadbalancer_agent' def get_topic_name(prefix, table, operation): diff --git a/quantum/db/loadbalancer/loadbalancer_db.py b/quantum/db/loadbalancer/loadbalancer_db.py index 69c9bc6003..7158cfe104 100644 --- a/quantum/db/loadbalancer/loadbalancer_db.py +++ b/quantum/db/loadbalancer/loadbalancer_db.py @@ -69,7 +69,7 @@ class Vip(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): protocol_port = sa.Column(sa.Integer, nullable=False) protocol = sa.Column(sa.Enum("HTTP", "HTTPS", "TCP", name="lb_protocols"), nullable=False) - pool_id = sa.Column(sa.String(36), nullable=False) + pool_id = sa.Column(sa.String(36), nullable=False, unique=True) session_persistence = orm.relationship(SessionPersistence, uselist=False, backref="vips", @@ -114,6 +114,7 @@ class Pool(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): cascade="all, delete-orphan") monitors = orm.relationship("PoolMonitorAssociation", backref="pools", cascade="all, delete-orphan") + vip = orm.relationship(Vip, backref='pool') class HealthMonitor(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): @@ -239,6 +240,12 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): raise return r + def assert_modification_allowed(self, obj): + status = getattr(obj, 'status', None) + + if status == constants.PENDING_DELETE: + raise loadbalancer.StateInvalid(id=id, state=status) + ######################################################## # VIP DB access def _make_vip_dict(self, vip, fields=None): @@ -270,11 +277,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): return self._fields(res, fields) - def _update_pool_vip_info(self, context, pool_id, vip_id): - pool_db = self._get_resource(context, Pool, pool_id) - with context.session.begin(subtransactions=True): - pool_db.update({'vip_id': vip_id}) - def _check_session_persistence_info(self, info): """ Performs sanity check on session persistence info. :param info: Session persistence info @@ -355,6 +357,14 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): tenant_id = self._get_tenant_id_for_create(context, v) with context.session.begin(subtransactions=True): + # validate that the pool has same tenant + if v['pool_id']: + pool = self._get_resource(context, Pool, v['pool_id']) + if pool['tenant_id'] != tenant_id: + raise q_exc.NotAuthorized() + else: + pool = None + vip_db = Vip(id=uuidutils.generate_uuid(), tenant_id=tenant_id, name=v['name'], @@ -367,16 +377,18 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): admin_state_up=v['admin_state_up'], status=constants.PENDING_CREATE) - vip_id = vip_db['id'] session_info = v['session_persistence'] if session_info: - s_p = self._create_session_persistence_db(session_info, vip_id) + s_p = self._create_session_persistence_db( + session_info, + vip_db['id']) vip_db.session_persistence = s_p context.session.add(vip_db) context.session.flush() + # create a port to reserve address for IPAM self._create_port_for_vip( context, vip_db, @@ -384,7 +396,9 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): v.get('address') ) - self._update_pool_vip_info(context, v['pool_id'], vip_id) + if pool: + pool['vip_id'] = vip_db['id'] + return self._make_vip_dict(vip_db) def update_vip(self, context, id, vip): @@ -392,20 +406,36 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): sess_persist = v.pop('session_persistence', None) with context.session.begin(subtransactions=True): + vip_db = self._get_resource(context, Vip, id) + + self.assert_modification_allowed(vip_db) + if sess_persist: self._update_vip_session_persistence(context, id, sess_persist) else: self._delete_session_persistence(context, id) - vip_db = self._get_resource(context, Vip, id) - old_pool_id = vip_db['pool_id'] if v: vip_db.update(v) # If the pool_id is changed, we need to update # the associated pools if 'pool_id' in v: - self._update_pool_vip_info(context, old_pool_id, None) - self._update_pool_vip_info(context, v['pool_id'], id) + new_pool = self._get_resource(context, Pool, v['pool_id']) + self.assert_modification_allowed(new_pool) + + # check that the pool matches the tenant_id + if new_pool['tenant_id'] != vip_db['tenant_id']: + raise q_exc.NotAuthorized() + + if vip_db['pool_id']: + old_pool = self._get_resource( + context, + Pool, + vip_db['pool_id'] + ) + old_pool['vip_id'] = None + + new_pool['vip_id'] = vip_db['id'] return self._make_vip_dict(vip_db) @@ -432,7 +462,7 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): ######################################################## # Pool DB access - def _make_pool_dict(self, context, pool, fields=None): + def _make_pool_dict(self, pool, fields=None): res = {'id': pool['id'], 'tenant_id': pool['tenant_id'], 'name': pool['name'], @@ -453,16 +483,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): return self._fields(res, fields) - def _update_pool_member_info(self, context, pool_id, membersInfo): - with context.session.begin(subtransactions=True): - member_qry = context.session.query(Member) - for member_id in membersInfo: - try: - member = member_qry.filter_by(id=member_id).one() - member.update({'pool_id': pool_id}) - except exc.NoResultFound: - raise loadbalancer.MemberNotFound(member_id=member_id) - def _create_pool_stats(self, context, pool_id): # This is internal method to add pool statistics. It won't # be exposed to API @@ -504,17 +524,17 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): context.session.add(pool_db) pool_db = self._get_resource(context, Pool, pool_db['id']) - return self._make_pool_dict(context, pool_db) + return self._make_pool_dict(pool_db) def update_pool(self, context, id, pool): - v = pool['pool'] + p = pool['pool'] with context.session.begin(subtransactions=True): pool_db = self._get_resource(context, Pool, id) - if v: - pool_db.update(v) + if p: + pool_db.update(p) - return self._make_pool_dict(context, pool_db) + return self._make_pool_dict(pool_db) def delete_pool(self, context, id): # Check if the pool is in use @@ -529,15 +549,15 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): def get_pool(self, context, id, fields=None): pool = self._get_resource(context, Pool, id) - return self._make_pool_dict(context, pool, fields) + return self._make_pool_dict(pool, fields) def get_pools(self, context, filters=None, fields=None): collection = self._model_query(context, Pool) collection = self._apply_filters_to_query(collection, Pool, filters) - return [self._make_pool_dict(context, c, fields) + return [self._make_pool_dict(c, fields) for c in collection.all()] - def get_stats(self, context, pool_id): + def stats(self, context, pool_id): with context.session.begin(subtransactions=True): pool_qry = context.session.query(Pool) try: @@ -600,6 +620,7 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): raise loadbalancer.HealthMonitorNotFound(monitor_id=id) def get_pool_health_monitor(self, context, id, pool_id, fields=None): + # TODO(markmcclain) look into why pool_id is ignored healthmonitor = self._get_resource(context, HealthMonitor, id) return self._make_health_monitor_dict(healthmonitor, fields) @@ -644,7 +665,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase): v = member['member'] with context.session.begin(subtransactions=True): member_db = self._get_resource(context, Member, id) - old_pool_id = member_db['pool_id'] if v: member_db.update(v) diff --git a/quantum/db/migration/alembic_migrations/versions/54c2c487e913_lbaas.py b/quantum/db/migration/alembic_migrations/versions/54c2c487e913_lbaas.py index 799300313a..09ed5208dc 100644 --- a/quantum/db/migration/alembic_migrations/versions/54c2c487e913_lbaas.py +++ b/quantum/db/migration/alembic_migrations/versions/54c2c487e913_lbaas.py @@ -58,6 +58,7 @@ def upgrade(active_plugin=None, options=None): sa.Column(u'admin_state_up', sa.Boolean(), nullable=False), sa.Column(u'connection_limit', sa.Integer(), nullable=True), sa.ForeignKeyConstraint(['port_id'], ['ports.id'], ), + sa.UniqueConstraint('pool_id'), sa.PrimaryKeyConstraint(u'id') ) op.create_table( diff --git a/quantum/extensions/loadbalancer.py b/quantum/extensions/loadbalancer.py index e1e6944d6b..75e68a6a6b 100644 --- a/quantum/extensions/loadbalancer.py +++ b/quantum/extensions/loadbalancer.py @@ -68,6 +68,7 @@ RESOURCE_ATTRIBUTE_MAP = { 'is_visible': True}, 'name': {'allow_post': True, 'allow_put': True, 'validate': {'type:string': None}, + 'default': '', 'is_visible': True}, 'description': {'allow_post': True, 'allow_put': True, 'validate': {'type:string': None}, @@ -128,6 +129,7 @@ RESOURCE_ATTRIBUTE_MAP = { 'is_visible': True}, 'name': {'allow_post': True, 'allow_put': True, 'validate': {'type:string': None}, + 'default': '', 'is_visible': True}, 'description': {'allow_post': True, 'allow_put': True, 'validate': {'type:string': None}, diff --git a/quantum/plugins/services/loadbalancer/__init__.py b/quantum/plugins/services/agent_loadbalancer/__init__.py similarity index 100% rename from quantum/plugins/services/loadbalancer/__init__.py rename to quantum/plugins/services/agent_loadbalancer/__init__.py diff --git a/quantum/plugins/services/agent_loadbalancer/agent/__init__.py b/quantum/plugins/services/agent_loadbalancer/agent/__init__.py new file mode 100644 index 0000000000..3632729c0b --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/agent/__init__.py @@ -0,0 +1,67 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import eventlet +from oslo.config import cfg + +from quantum.agent.common import config +from quantum.agent.linux import interface +from quantum.common import topics +from quantum.openstack.common.rpc import service as rpc_service +from quantum.openstack.common import service +from quantum.plugins.services.agent_loadbalancer.agent import manager + + +OPTS = [ + cfg.IntOpt( + 'periodic_interval', + default=10, + help=_('Seconds between periodic task runs') + ) +] + + +class LbaasAgentService(rpc_service.Service): + def start(self): + super(LbaasAgentService, self).start() + self.tg.add_timer( + cfg.CONF.periodic_interval, + self.manager.run_periodic_tasks, + None, + None + ) + + +def main(): + eventlet.monkey_patch() + cfg.CONF.register_opts(OPTS) + cfg.CONF.register_opts(manager.OPTS) + # import interface options just in case the driver uses namespaces + cfg.CONF.register_opts(interface.OPTS) + config.register_root_helper(cfg.CONF) + + cfg.CONF(project='quantum') + config.setup_logging(cfg.CONF) + + mgr = manager.LbaasAgentManager(cfg.CONF) + svc = LbaasAgentService( + host=cfg.CONF.host, + topic=topics.LOADBALANCER_AGENT, + manager=mgr + ) + service.launch(svc).wait() diff --git a/quantum/plugins/services/agent_loadbalancer/agent/api.py b/quantum/plugins/services/agent_loadbalancer/agent/api.py new file mode 100644 index 0000000000..cd4314d3dc --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/agent/api.py @@ -0,0 +1,81 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +from quantum.openstack.common.rpc import proxy + + +class LbaasAgentApi(proxy.RpcProxy): + """Agent side of the Agent to Plugin RPC API.""" + + API_VERSION = '1.0' + + def __init__(self, topic, context, host): + super(LbaasAgentApi, self).__init__(topic, self.API_VERSION) + self.context = context + self.host = host + + def get_ready_devices(self): + return self.call( + self.context, + self.make_msg('get_ready_devices', host=self.host), + topic=self.topic + ) + + def get_logical_device(self, pool_id): + return self.call( + self.context, + self.make_msg( + 'get_logical_device', + pool_id=pool_id, + host=self.host + ), + topic=self.topic + ) + + def pool_destroyed(self, pool_id): + return self.call( + self.context, + self.make_msg('pool_destroyed', pool_id=pool_id, host=self.host), + topic=self.topic + ) + + def plug_vip_port(self, port_id): + return self.call( + self.context, + self.make_msg('plug_vip_port', port_id=port_id, host=self.host), + topic=self.topic + ) + + def unplug_vip_port(self, port_id): + return self.call( + self.context, + self.make_msg('unplug_vip_port', port_id=port_id, host=self.host), + topic=self.topic + ) + + def update_pool_stats(self, pool_id, stats): + return self.call( + self.context, + self.make_msg( + 'update_pool_stats', + pool_id=pool_id, + stats=stats, + host=self.host + ), + topic=self.topic + ) diff --git a/quantum/plugins/services/agent_loadbalancer/agent/manager.py b/quantum/plugins/services/agent_loadbalancer/agent/manager.py new file mode 100644 index 0000000000..9dd2a70d36 --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/agent/manager.py @@ -0,0 +1,221 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import weakref + +from oslo.config import cfg + +from quantum.agent.common import config +from quantum.common import topics +from quantum import context +from quantum.openstack.common import importutils +from quantum.openstack.common import log as logging +from quantum.openstack.common import periodic_task +from quantum.plugins.services.agent_loadbalancer.agent import api + +LOG = logging.getLogger(__name__) +NS_PREFIX = 'qlbaas-' + +OPTS = [ + cfg.StrOpt( + 'device_driver', + help=_('The driver used to manage the loadbalancing device'), + ), + cfg.StrOpt( + 'loadbalancer_state_path', + default='$state_path/lbaas', + help=_('Location to store config and state files'), + ), + cfg.StrOpt( + 'interface_driver', + help=_('The driver used to manage the virtual interface') + ) +] + + +class LogicalDeviceCache(object): + """Manage a cache of known devices.""" + + class Device(object): + """Inner classes used to hold values for weakref lookups""" + def __init__(self, port_id, pool_id): + self.port_id = port_id + self.pool_id = pool_id + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + def __hash__(self): + return hash((self.port_id, self.pool_id)) + + def __init__(self): + self.devices = set() + self.port_lookup = weakref.WeakValueDictionary() + self.pool_lookup = weakref.WeakValueDictionary() + + def put(self, device): + port_id = device['vip']['port_id'] + pool_id = device['pool']['id'] + d = self.Device(device['vip']['port_id'], device['pool']['id']) + if d not in self.devices: + self.devices.add(d) + self.port_lookup[port_id] = d + self.pool_lookup[pool_id] = d + + def remove(self, device): + if not isinstance(device, self.Device): + device = self.Device( + device['vip']['port_id'], device['pool']['id'] + ) + if device in self.devices: + self.devices.remove(device) + + def remove_by_pool_id(self, pool_id): + d = self.pool_lookup.get(pool_id) + if d: + self.devices.remove(d) + + def get_by_pool_id(self, pool_id): + return self.pool_lookup.get(pool_id) + + def get_by_port_id(self, port_id): + return self.port_lookup.get(port_id) + + def get_pool_ids(self): + return self.pool_lookup.keys() + + +class LbaasAgentManager(periodic_task.PeriodicTasks): + def __init__(self, conf): + self.conf = conf + try: + vif_driver = importutils.import_object(conf.interface_driver, conf) + except ImportError: + # the driver is optional + msg = _('Error importing interface driver: %s') + raise SystemExit(msg % conf.interface_driver) + vif_driver = None + + try: + self.driver = importutils.import_object( + conf.device_driver, + config.get_root_helper(self.conf), + conf.loadbalancer_state_path, + vif_driver, + self._vip_plug_callback + ) + except ImportError: + msg = _('Error importing loadbalancer device driver: %s') + raise SystemExit(msg % conf.device_driver) + ctx = context.get_admin_context_without_session() + self.plugin_rpc = api.LbaasAgentApi( + topics.LOADBALANCER_PLUGIN, + ctx, + conf.host + ) + self.needs_resync = False + self.cache = LogicalDeviceCache() + + def initialize_service_hook(self, started_by): + self.sync_state() + + @periodic_task.periodic_task + def periodic_resync(self, context): + if self.needs_resync: + self.needs_resync = False + self.sync_state() + + @periodic_task.periodic_task(ticks_between_runs=6) + def collect_stats(self, context): + for pool_id in self.cache.get_pool_ids(): + try: + stats = self.driver.get_stats(pool_id) + if stats: + self.plugin_rpc.update_pool_stats(pool_id, stats) + except Exception: + LOG.exception(_('Error upating stats')) + self.needs_resync = True + + def _vip_plug_callback(self, action, port): + if action == 'plug': + self.plugin_rpc.plug_vip_port(port['id']) + elif action == 'unplug': + self.plugin_rpc.unplug_vip_port(port['id']) + + def sync_state(self): + known_devices = set(self.cache.get_pool_ids()) + try: + ready_logical_devices = set(self.plugin_rpc.get_ready_devices()) + + for deleted_id in known_devices - ready_logical_devices: + self.destroy_device(deleted_id) + + for pool_id in ready_logical_devices: + self.refresh_device(pool_id) + + except Exception: + LOG.exception(_('Unable to retrieve ready devices')) + self.needs_resync = True + + self.remove_orphans() + + def refresh_device(self, pool_id): + try: + logical_config = self.plugin_rpc.get_logical_device(pool_id) + + if self.driver.exists(pool_id): + self.driver.update(logical_config) + else: + self.driver.create(logical_config) + self.cache.put(logical_config) + except Exception: + LOG.exception(_('Unable to refresh device for pool: %s'), pool_id) + self.needs_resync = True + + def destroy_device(self, pool_id): + device = self.cache.get_by_pool_id(pool_id) + if not device: + return + try: + self.driver.destroy(pool_id) + self.plugin_rpc.pool_destroyed(pool_id) + except Exception: + LOG.exception(_('Unable to destroy device for pool: %s'), pool_id) + self.needs_resync = True + self.cache.remove(device) + + def remove_orphans(self): + try: + self.driver.remove_orphans(self.cache.get_pool_ids()) + except NotImplementedError: + pass # Not all drivers will support this + + def reload_pool(self, context, pool_id=None, host=None): + """Handle RPC cast from plugin to reload a pool.""" + if pool_id: + self.refresh_device(pool_id) + + def modify_pool(self, context, pool_id=None, host=None): + """Handle RPC cast from plugin to modify a pool if known to agent.""" + if self.cache.get_by_pool_id(pool_id): + self.refresh_device(pool_id) + + def destroy_pool(self, context, pool_id=None, host=None): + """Handle RPC cast from plugin to destroy a pool if known to agent.""" + if self.cache.get_by_pool_id(pool_id): + self.destroy_device(pool_id) diff --git a/quantum/plugins/services/agent_loadbalancer/constants.py b/quantum/plugins/services/agent_loadbalancer/constants.py new file mode 100644 index 0000000000..82b049f173 --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/constants.py @@ -0,0 +1,33 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Mirantis, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +LB_METHOD_ROUND_ROBIN = 'ROUND_ROBIN' +LB_METHOD_LEAST_CONNECTIONS = 'LEAST_CONNECTIONS' +LB_METHOD_SOURCE_IP = 'SOURCE_IP' + +PROTOCOL_TCP = 'TCP' +PROTOCOL_HTTP = 'HTTP' +PROTOCOL_HTTPS = 'HTTPS' + +HEALTH_MONITOR_PING = 'PING' +HEALTH_MONITOR_TCP = 'TCP' +HEALTH_MONITOR_HTTP = 'HTTP' +HEALTH_MONITOR_HTTPS = 'HTTPS' + +SESSION_PERSISTENCE_SOURCE_IP = 'SOURCE_IP' +SESSION_PERSISTENCE_HTTP_COOKIE = 'HTTP_COOKIE' +SESSION_PERSISTENCE_APP_COOKIE = 'APP_COOKIE' diff --git a/quantum/plugins/services/agent_loadbalancer/drivers/__init__.py b/quantum/plugins/services/agent_loadbalancer/drivers/__init__.py new file mode 100644 index 0000000000..ce18bf6d68 --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/drivers/__init__.py @@ -0,0 +1,17 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost diff --git a/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/__init__.py b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/__init__.py new file mode 100644 index 0000000000..ce18bf6d68 --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/__init__.py @@ -0,0 +1,17 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost diff --git a/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/cfg.py b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/cfg.py new file mode 100644 index 0000000000..936bcff303 --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/cfg.py @@ -0,0 +1,184 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import itertools + +from quantum.agent.linux import utils +from quantum.plugins.common import constants as qconstants +from quantum.plugins.services.agent_loadbalancer import constants + + +PROTOCOL_MAP = { + constants.PROTOCOL_TCP: 'tcp', + constants.PROTOCOL_HTTP: 'http', + constants.PROTOCOL_HTTPS: 'tcp', +} + +BALANCE_MAP = { + constants.LB_METHOD_ROUND_ROBIN: 'roundrobin', + constants.LB_METHOD_LEAST_CONNECTIONS: 'leastconn', + constants.LB_METHOD_SOURCE_IP: 'source' +} + +ACTIVE = qconstants.ACTIVE + + +def save_config(conf_path, logical_config, socket_path=None): + """Convert a logical configuration to the HAProxy version""" + data = [] + data.extend(_build_global(logical_config, socket_path=socket_path)) + data.extend(_build_defaults(logical_config)) + data.extend(_build_frontend(logical_config)) + data.extend(_build_backend(logical_config)) + utils.replace_file(conf_path, '\n'.join(data)) + + +def _build_global(config, socket_path=None): + opts = [ + 'daemon', + 'user nobody', + 'group nogroup', + 'log /dev/log local0', + 'log /dev/log local1 notice' + ] + + if socket_path: + opts.append('stats socket %s mode 0666 level user' % socket_path) + + return itertools.chain(['global'], ('\t' + o for o in opts)) + + +def _build_defaults(config): + opts = [ + 'log global', + 'retries 3', + 'option redispatch', + 'timeout connect 5000', + 'timeout client 50000', + 'timeout server 50000', + ] + + return itertools.chain(['defaults'], ('\t' + o for o in opts)) + + +def _build_frontend(config): + protocol = config['vip']['protocol'] + + opts = [ + 'option tcplog', + 'bind %s:%d' % ( + _get_first_ip_from_port(config['vip']['port']), + config['vip']['protocol_port'] + ), + 'mode %s' % PROTOCOL_MAP[protocol], + 'default_backend %s' % config['pool']['id'], + ] + + if config['vip']['connection_limit'] >= 0: + opts.append('maxconn %s' % config['vip']['connection_limit']) + + if protocol == constants.PROTOCOL_HTTP: + opts.append('option forwardfor') + + return itertools.chain( + ['frontend %s' % config['vip']['id']], + ('\t' + o for o in opts) + ) + + +def _build_backend(config): + protocol = config['pool']['protocol'] + lb_method = config['pool']['lb_method'] + + opts = [ + 'mode %s' % PROTOCOL_MAP[protocol], + 'balance %s' % BALANCE_MAP.get(lb_method, 'roundrobin') + ] + + if protocol == constants.PROTOCOL_HTTP: + opts.append('option forwardfor') + + # add the first health_monitor (if available) + server_addon, health_opts = _get_server_health_option(config) + opts.extend(health_opts) + + # add the members + opts.extend( + (('server %(id)s %(address)s:%(protocol_port)s ' + 'weight %(weight)s') % member) + server_addon + for member in config['members'] + if (member['status'] == ACTIVE and member['admin_state_up']) + ) + + return itertools.chain( + ['backend %s' % config['pool']['id']], + ('\t' + o for o in opts) + ) + + +def _get_first_ip_from_port(port): + for fixed_ip in port['fixed_ips']: + return fixed_ip['ip_address'] + + +def _get_server_health_option(config): + """return the first active health option""" + for monitor in config['healthmonitors']: + if monitor['status'] == ACTIVE and monitor['admin_state_up']: + break + else: + return '', [] + + server_addon = ' check inter %(delay)ds fall %(max_retries)d' % monitor + opts = [ + 'timeout check %ds' % monitor['timeout'] + ] + + if monitor['type'] in (constants.HEALTH_MONITOR_HTTP, + constants.HEALTH_MONITOR_HTTPS): + opts.append('option httpchk %(http_method)s %(url_path)s' % monitor) + opts.append( + 'http-check expect rstatus %s' % + '|'.join(_expand_expected_codes(monitor['expected_codes'])) + ) + + if monitor['type'] == constants.HEALTH_MONITOR_HTTPS: + opts.append('option ssl-hello-chk') + + return server_addon, opts + + +def _expand_expected_codes(codes): + """Expand the expected code string in set of codes. + + 200-204 -> 200, 201, 202, 204 + 200, 203 -> 200, 203 + """ + + retval = set() + for code in codes.replace(',', ' ').split(' '): + code = code.strip() + + if not code: + continue + elif '-' in code: + low, hi = code.split('-')[:2] + retval.update(str(i) for i in xrange(int(low), int(hi))) + else: + retval.add(code) + return retval diff --git a/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/namespace_driver.py b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/namespace_driver.py new file mode 100644 index 0000000000..f4df283d31 --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/namespace_driver.py @@ -0,0 +1,182 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost +import os +import shutil +import socket + +import netaddr + +from quantum.agent.linux import ip_lib +from quantum.common import exceptions +from quantum.openstack.common import log as logging +from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import ( + cfg as hacfg +) + +LOG = logging.getLogger(__name__) +NS_PREFIX = 'qlbaas-' + + +class HaproxyNSDriver(object): + def __init__(self, root_helper, state_path, vif_driver, vip_plug_callback): + self.root_helper = root_helper + self.state_path = state_path + self.vif_driver = vif_driver + self.vip_plug_callback = vip_plug_callback + self.pool_to_port_id = {} + + def create(self, logical_config): + pool_id = logical_config['pool']['id'] + namespace = get_ns_name(pool_id) + + self._plug(namespace, logical_config['vip']['port']) + self._spawn(logical_config) + + def update(self, logical_config): + pool_id = logical_config['pool']['id'] + pid_path = self._get_state_file_path(pool_id, 'pid') + + extra_args = ['-sf'] + extra_args.extend(p.strip() for p in open(pid_path, 'r')) + self._spawn(logical_config, extra_args) + + def _spawn(self, logical_config, extra_cmd_args=()): + pool_id = logical_config['pool']['id'] + namespace = get_ns_name(pool_id) + conf_path = self._get_state_file_path(pool_id, 'conf') + pid_path = self._get_state_file_path(pool_id, 'pid') + sock_path = self._get_state_file_path(pool_id, 'sock') + + hacfg.save_config(conf_path, logical_config, sock_path) + cmd = ['haproxy', '-f', conf_path, '-p', pid_path] + cmd.extend(extra_cmd_args) + + ns = ip_lib.IPWrapper(self.root_helper, namespace) + ns.netns.execute(cmd) + + # remember the pool<>port mapping + self.pool_to_port_id[pool_id] = logical_config['vip']['port']['id'] + + def destroy(self, pool_id): + namespace = get_ns_name(pool_id) + ns = ip_lib.IPWrapper(self.root_helper, namespace) + pid_path = self._get_state_file_path(pool_id, 'pid') + sock_path = self._get_state_file_path(pool_id, 'sock') + + # kill the process + kill_pids_in_file(ns, pid_path) + + # unplug the ports + if pool_id in self.pool_to_port_id: + self._unplug(namespace, self.pool_to_port_id[pool_id]) + + # remove the configuration directory + conf_dir = os.path.dirname(self._get_state_file_path(pool_id, '')) + if os.path.isdir(conf_dir): + shutil.rmtree(conf_dir) + ns.garbage_collect_namespace() + + def exists(self, pool_id): + namespace = get_ns_name(pool_id) + root_ns = ip_lib.IPWrapper(self.root_helper) + + socket_path = self._get_state_file_path(pool_id, 'sock') + if root_ns.netns.exists(namespace) and os.path.exists(socket_path): + try: + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.connect(socket_path) + return True + except socket.error: + pass + return False + + def get_stats(self, pool_id): + pass + + def remove_orphans(self, known_pool_ids): + raise NotImplementedError() + + def _get_state_file_path(self, pool_id, kind, ensure_state_dir=True): + """Returns the file name for a given kind of config file.""" + confs_dir = os.path.abspath(os.path.normpath(self.state_path)) + conf_dir = os.path.join(confs_dir, pool_id) + if ensure_state_dir: + if not os.path.isdir(conf_dir): + os.makedirs(conf_dir, 0755) + return os.path.join(conf_dir, kind) + + def _plug(self, namespace, port, reuse_existing=True): + self.vip_plug_callback('plug', port) + interface_name = self.vif_driver.get_device_name(Wrap(port)) + + if ip_lib.device_exists(interface_name, self.root_helper, namespace): + if not reuse_existing: + raise exceptions.PreexistingDeviceFailure( + dev_name=interface_name + ) + else: + self.vif_driver.plug( + port['network_id'], + port['id'], + interface_name, + port['mac_address'], + namespace=namespace + ) + + cidrs = [ + '%s/%s' % (ip['ip_address'], + netaddr.IPNetwork(ip['subnet']['cidr']).prefixlen) + for ip in port['fixed_ips'] + ] + self.vif_driver.init_l3(interface_name, cidrs, namespace=namespace) + + def _unplug(self, namespace, port_id): + port_stub = {'id': port_id} + self.vip_plug_callback('unplug', port_stub) + interface_name = self.vif_driver.get_device_name(Wrap(port_stub)) + self.vif_driver.unplug(interface_name, namespace=namespace) + + +# NOTE (markmcclain) For compliance with interface.py which expects objects +class Wrap(object): + """A light attribute wrapper for compatibility with the interface lib.""" + def __init__(self, d): + self.__dict__.update(d) + + def __getitem__(self, key): + return self.__dict__[key] + + +def get_ns_name(namespace_id): + return NS_PREFIX + namespace_id + + +def kill_pids_in_file(namespace_wrapper, pid_path): + if os.path.exists(pid_path): + with open(pid_path, 'r') as pids: + for pid in pids: + pid = pid.strip() + try: + namespace_wrapper.netns.execute( + ['kill', '-9', pid.strip()] + ) + except RuntimeError: + LOG.exception( + _('Unable to kill haproxy process: %s'), + pid + ) diff --git a/quantum/plugins/services/agent_loadbalancer/plugin.py b/quantum/plugins/services/agent_loadbalancer/plugin.py new file mode 100644 index 0000000000..3b7b48ec77 --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/plugin.py @@ -0,0 +1,338 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from oslo.config import cfg + +from quantum.common import exceptions as q_exc +from quantum.common import rpc as q_rpc +from quantum.common import topics +from quantum.db import api as qdbapi +from quantum.db.loadbalancer import loadbalancer_db +from quantum.openstack.common import log as logging +from quantum.openstack.common import rpc +from quantum.openstack.common.rpc import proxy +from quantum.plugins.common import constants + +LOG = logging.getLogger(__name__) + +ACTIVE_PENDING = ( + constants.ACTIVE, + constants.PENDING_CREATE, + constants.PENDING_UPDATE +) + + +class LoadBalancerCallbacks(object): + RPC_API_VERSION = '1.0' + + def __init__(self, plugin): + self.plugin = plugin + + def create_rpc_dispatcher(self): + return q_rpc.PluginRpcDispatcher([self]) + + def get_ready_devices(self, context, host=None): + with context.session.begin(subtransactions=True): + qry = context.session.query( + loadbalancer_db.Vip, loadbalancer_db.Pool + ) + qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING)) + qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING)) + up = True # makes pep8 and sqlalchemy happy + qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up) + qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up) + return [p.id for v, p in qry.all()] + + def get_logical_device(self, context, pool_id=None, activate=True, + **kwargs): + with context.session.begin(subtransactions=True): + qry = context.session.query(loadbalancer_db.Pool) + qry = qry.filter_by(id=pool_id) + pool = qry.one() + + if activate: + # set all resources to active + if pool.status in ACTIVE_PENDING: + pool.status = constants.ACTIVE + + if pool.vip.status in ACTIVE_PENDING: + pool.vip.status = constants.ACTIVE + + for m in pool.members: + if m.status in ACTIVE_PENDING: + m.status = constants.ACTIVE + + for hm in pool.monitors: + if hm.monitor.status in ACTIVE_PENDING: + hm.monitor.status = constants.ACTIVE + + if (pool.status != constants.ACTIVE + or pool.vip.status != constants.ACTIVE): + raise Exception(_('Expected active pool and vip')) + + retval = {} + retval['pool'] = self.plugin._make_pool_dict(pool) + retval['vip'] = self.plugin._make_vip_dict(pool.vip) + retval['vip']['port'] = ( + self.plugin._core_plugin._make_port_dict(pool.vip.port) + ) + for fixed_ip in retval['vip']['port']['fixed_ips']: + fixed_ip['subnet'] = ( + self.plugin._core_plugin.get_subnet( + context, + fixed_ip['subnet_id'] + ) + ) + retval['members'] = [ + self.plugin._make_member_dict(m) + for m in pool.members if m.status == constants.ACTIVE + ] + retval['healthmonitors'] = [ + self.plugin._make_health_monitor_dict(hm.monitor) + for hm in pool.monitors + if hm.monitor.status == constants.ACTIVE + ] + + return retval + + def pool_destroyed(self, context, pool_id=None, host=None): + """Agent confirmation hook that a pool has been destroyed. + + This method exists for subclasses to change the deletion + behavior. + """ + pass + + def plug_vip_port(self, context, port_id=None, host=None): + if not port_id: + return + + port = self.plugin._core_plugin.get_port( + context, + port_id + ) + + port['admin_state_up'] = True + port['device_owner'] = 'quantum:' + constants.LOADBALANCER + port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host))) + + self.plugin._core_plugin.update_port( + context, + port_id, + {'port': port} + ) + + def unplug_vip_port(self, context, port_id=None, host=None): + if not port_id: + return + + port = self.plugin._core_plugin.get_port( + context, + port_id + ) + + port['admin_state_up'] = False + port['device_owner'] = '' + port['device_id'] = '' + + try: + self.plugin._core_plugin.update_port( + context, + port_id, + {'port': port} + ) + + except q_exc.PortNotFound: + msg = _('Unable to find port %s to unplug. This can occur when ' + 'the Vip has been deleted first.') + LOG.debug(msg, port_id) + + def update_pool_stats(self, context, pool_id=None, stats=None, host=None): + # TODO (markmcclain): add stats collection + pass + + +class LoadBalancerAgentApi(proxy.RpcProxy): + """Plugin side of plugin to agent RPC API.""" + + API_VERSION = '1.0' + + def __init__(self, topic, host): + super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION) + self.host = host + + def reload_pool(self, context, pool_id): + return self.cast( + context, + self.make_msg('reload_pool', pool_id=pool_id, host=self.host), + topic=self.topic + ) + + def destroy_pool(self, context, pool_id): + return self.cast( + context, + self.make_msg('destroy_pool', pool_id=pool_id, host=self.host), + topic=self.topic + ) + + def modify_pool(self, context, pool_id): + return self.cast( + context, + self.make_msg('modify_pool', pool_id=pool_id, host=self.host), + topic=self.topic + ) + + +class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb): + + """ + Implementation of the Quantum Loadbalancer Service Plugin. + + This class manages the workflow of LBaaS request/response. + Most DB related works are implemented in class + loadbalancer_db.LoadBalancerPluginDb. + """ + supported_extension_aliases = ["lbaas"] + + def __init__(self): + """ + Do the initialization for the loadbalancer service plugin here. + """ + qdbapi.register_models() + + self.callbacks = LoadBalancerCallbacks(self) + + self.conn = rpc.create_connection(new=True) + self.conn.create_consumer( + topics.LOADBALANCER_PLUGIN, + self.callbacks.create_rpc_dispatcher(), + fanout=False) + self.conn.consume_in_thread() + + self.agent_rpc = LoadBalancerAgentApi( + topics.LOADBALANCER_AGENT, + cfg.CONF.host + ) + + def get_plugin_type(self): + return constants.LOADBALANCER + + def get_plugin_description(self): + return "Quantum LoadBalancer Service Plugin" + + def create_vip(self, context, vip): + vip['vip']['status'] = constants.PENDING_CREATE + v = super(LoadBalancerPlugin, self).create_vip(context, vip) + self.agent_rpc.reload_pool(context, v['pool_id']) + return v + + def update_vip(self, context, id, vip): + if 'status' not in vip['vip']: + vip['vip']['status'] = constants.PENDING_UPDATE + v = super(LoadBalancerPlugin, self).update_vip(context, id, vip) + if v['status'] in ACTIVE_PENDING: + self.agent_rpc.reload_pool(context, v['pool_id']) + else: + self.agent_rpc.destroy_pool(context, v['pool_id']) + return v + + def delete_vip(self, context, id): + vip = self.get_vip(context, id) + super(LoadBalancerPlugin, self).delete_vip(context, id) + self.agent_rpc.destroy_pool(context, vip['pool_id']) + pass + + def create_pool(self, context, pool): + p = super(LoadBalancerPlugin, self).create_pool(context, pool) + # don't notify here because a pool needs a vip to be useful + return p + + def update_pool(self, context, id, pool): + if 'status' not in pool['pool']: + pool['pool']['status'] = constants.PENDING_UPDATE + p = super(LoadBalancerPlugin, self).update_pool(context, id, pool) + if p['status'] in ACTIVE_PENDING: + self.agent_rpc.reload_pool(context, p['id']) + else: + self.agent_rpc.destroy_pool(context, p['id']) + return p + + def delete_pool(self, context, id): + super(LoadBalancerPlugin, self).delete_pool(context, id) + self.agent_rpc.destroy_pool(context, id) + + def create_member(self, context, member): + m = super(LoadBalancerPlugin, self).create_member(context, member) + self.agent_rpc.modify_pool(context, m['pool_id']) + return m + + def update_member(self, context, id, member): + if 'status' not in member['member']: + member['member']['status'] = constants.PENDING_UPDATE + m = super(LoadBalancerPlugin, self).update_member(context, id, member) + self.agent_rpc.modify_pool(context, m['pool_id']) + return m + + def delete_member(self, context, id): + m = self.get_member(context, id) + super(LoadBalancerPlugin, self).delete_member(context, id) + self.agent_rpc.modify_pool(context, m['pool_id']) + + def update_health_monitor(self, context, id, health_monitor): + if 'status' not in health_monitor['health_monitor']: + health_monitor['health_monitor']['status'] = ( + constants.PENDING_UPDATE + ) + hm = super(LoadBalancerPlugin, self).update_health_monitor( + context, + id, + health_monitor + ) + + with context.session.begin(subtransactions=True): + qry = context.session.query( + loadbalancer_db.PoolMonitorAssociation + ) + qry = qry.filter_by(monitor_id=hm['id']) + + for assoc in qry.all(): + self.agent_rpc.modify_pool(context, assoc['pool_id']) + return hm + + def delete_health_monitor(self, context, id): + with context.session.begin(subtransactions=True): + qry = context.session.query( + loadbalancer_db.PoolMonitorAssociation + ) + qry = qry.filter_by(monitor_id=id) + + pool_ids = [a['pool_id'] for a in qry.all()] + super(LoadBalancerPlugin, self).delete_health_monitor(context, id) + for pid in pool_ids: + self.agent_rpc.modify_pool(context, pid) + + def create_pool_health_monitor(self, context, health_monitor, pool_id): + retval = super(LoadBalancerPlugin, self).create_pool_health_monitor( + context, + health_monitor, + pool_id + ) + self.agent_rpc.modify_pool(context, pool_id) + + return retval diff --git a/quantum/plugins/services/loadbalancer/loadbalancerPlugin.py b/quantum/plugins/services/loadbalancer/loadbalancerPlugin.py deleted file mode 100644 index 27a4377ff5..0000000000 --- a/quantum/plugins/services/loadbalancer/loadbalancerPlugin.py +++ /dev/null @@ -1,252 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2013 OpenStack LLC. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -from quantum.db import api as qdbapi -from quantum.db import model_base -from quantum.db.loadbalancer import loadbalancer_db -from quantum.extensions import loadbalancer -from quantum.openstack.common import log as logging -from quantum.plugins.common import constants - -LOG = logging.getLogger(__name__) - - -class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb): - - """ - Implementation of the Quantum Loadbalancer Service Plugin. - - This class manages the workflow of LBaaS request/response. - Most DB related works are implemented in class - loadbalancer_db.LoadBalancerPluginDb. - """ - supported_extension_aliases = ["lbaas"] - - def __init__(self): - """ - Do the initialization for the loadbalancer service plugin here. - """ - qdbapi.register_models(base=model_base.BASEV2) - - # TODO: we probably need to setup RPC channel (to talk to LbAgent) here - - def get_plugin_type(self): - return constants.LOADBALANCER - - def get_plugin_description(self): - return "Quantum LoadBalancer Service Plugin" - - def create_vip(self, context, vip): - v = super(LoadBalancerPlugin, self).create_vip(context, vip) - self.update_status(context, loadbalancer_db.Vip, v['id'], - constants.PENDING_CREATE) - LOG.debug(_("Create vip: %s"), v['id']) - - # If we adopt asynchronous mode, this method should return immediately - # and let client to query the object status. The plugin will listen on - # the event from device and update the object status by calling - # self.update_state(context, Vip, id, ACTIVE/ERROR) - # - # In synchronous mode, send the request to device here and wait for - # response. Eventually update the object status prior to the return. - v_query = self.get_vip(context, v['id']) - return v_query - - def update_vip(self, context, id, vip): - v_query = self.get_vip( - context, id, fields=["status"]) - if v_query['status'] in [ - constants.PENDING_DELETE, constants.ERROR]: - raise loadbalancer.StateInvalid(id=id, - state=v_query['status']) - - v = super(LoadBalancerPlugin, self).update_vip(context, id, vip) - self.update_status(context, loadbalancer_db.Vip, id, - constants.PENDING_UPDATE) - LOG.debug(_("Update vip: %s"), id) - - # TODO notify lbagent - v_rt = self.get_vip(context, id) - return v_rt - - def delete_vip(self, context, id): - self.update_status(context, loadbalancer_db.Vip, id, - constants.PENDING_DELETE) - LOG.debug(_("Delete vip: %s"), id) - - # TODO notify lbagent - super(LoadBalancerPlugin, self).delete_vip(context, id) - - def get_vip(self, context, id, fields=None): - res = super(LoadBalancerPlugin, self).get_vip(context, id, fields) - LOG.debug(_("Get vip: %s"), id) - return res - - def get_vips(self, context, filters=None, fields=None): - res = super(LoadBalancerPlugin, self).get_vips( - context, filters, fields) - LOG.debug(_("Get vips")) - return res - - def create_pool(self, context, pool): - p = super(LoadBalancerPlugin, self).create_pool(context, pool) - self.update_status(context, loadbalancer_db.Pool, p['id'], - constants.PENDING_CREATE) - LOG.debug(_("Create pool: %s"), p['id']) - - # TODO notify lbagent - p_rt = self.get_pool(context, p['id']) - return p_rt - - def update_pool(self, context, id, pool): - p_query = self.get_pool(context, id, fields=["status"]) - if p_query['status'] in [ - constants.PENDING_DELETE, constants.ERROR]: - raise loadbalancer.StateInvalid(id=id, - state=p_query['status']) - p = super(LoadBalancerPlugin, self).update_pool(context, id, pool) - LOG.debug(_("Update pool: %s"), p['id']) - # TODO notify lbagent - p_rt = self.get_pool(context, id) - return p_rt - - def delete_pool(self, context, id): - self.update_status(context, loadbalancer_db.Pool, id, - constants.PENDING_DELETE) - # TODO notify lbagent - super(LoadBalancerPlugin, self).delete_pool(context, id) - LOG.debug(_("Delete pool: %s"), id) - - def get_pool(self, context, id, fields=None): - res = super(LoadBalancerPlugin, self).get_pool(context, id, fields) - LOG.debug(_("Get pool: %s"), id) - return res - - def get_pools(self, context, filters=None, fields=None): - res = super(LoadBalancerPlugin, self).get_pools( - context, filters, fields) - LOG.debug(_("Get Pools")) - return res - - def stats(self, context, pool_id): - res = super(LoadBalancerPlugin, self).get_stats(context, pool_id) - LOG.debug(_("Get stats of Pool: %s"), pool_id) - return res - - def create_pool_health_monitor(self, context, health_monitor, pool_id): - m = super(LoadBalancerPlugin, self).create_pool_health_monitor( - context, health_monitor, pool_id) - LOG.debug(_("Create health_monitor of pool: %s"), pool_id) - return m - - def get_pool_health_monitor(self, context, id, pool_id, fields=None): - m = super(LoadBalancerPlugin, self).get_pool_health_monitor( - context, id, pool_id, fields) - LOG.debug(_("Get health_monitor of pool: %s"), pool_id) - return m - - def delete_pool_health_monitor(self, context, id, pool_id): - super(LoadBalancerPlugin, self).delete_pool_health_monitor( - context, id, pool_id) - LOG.debug(_("Delete health_monitor %(id)s of pool: %(pool_id)s"), - {"id": id, "pool_id": pool_id}) - - def get_member(self, context, id, fields=None): - res = super(LoadBalancerPlugin, self).get_member( - context, id, fields) - LOG.debug(_("Get member: %s"), id) - return res - - def get_members(self, context, filters=None, fields=None): - res = super(LoadBalancerPlugin, self).get_members( - context, filters, fields) - LOG.debug(_("Get members")) - return res - - def create_member(self, context, member): - m = super(LoadBalancerPlugin, self).create_member(context, member) - self.update_status(context, loadbalancer_db.Member, m['id'], - constants.PENDING_CREATE) - LOG.debug(_("Create member: %s"), m['id']) - # TODO notify lbagent - m_rt = self.get_member(context, m['id']) - return m_rt - - def update_member(self, context, id, member): - m_query = self.get_member(context, id, fields=["status"]) - if m_query['status'] in [ - constants.PENDING_DELETE, constants.ERROR]: - raise loadbalancer.StateInvalid(id=id, - state=m_query['status']) - m = super(LoadBalancerPlugin, self).update_member(context, id, member) - self.update_status(context, loadbalancer_db.Member, id, - constants.PENDING_UPDATE) - LOG.debug(_("Update member: %s"), m['id']) - # TODO notify lbagent - m_rt = self.get_member(context, id) - return m_rt - - def delete_member(self, context, id): - self.update_status(context, loadbalancer_db.Member, id, - constants.PENDING_DELETE) - LOG.debug(_("Delete member: %s"), id) - # TODO notify lbagent - super(LoadBalancerPlugin, self).delete_member(context, id) - - def get_health_monitor(self, context, id, fields=None): - res = super(LoadBalancerPlugin, self).get_health_monitor( - context, id, fields) - LOG.debug(_("Get health_monitor: %s"), id) - return res - - def get_health_monitors(self, context, filters=None, fields=None): - res = super(LoadBalancerPlugin, self).get_health_monitors( - context, filters, fields) - LOG.debug(_("Get health_monitors")) - return res - - def create_health_monitor(self, context, health_monitor): - h = super(LoadBalancerPlugin, self).create_health_monitor( - context, health_monitor) - self.update_status(context, loadbalancer_db.HealthMonitor, h['id'], - constants.PENDING_CREATE) - LOG.debug(_("Create health_monitor: %s"), h['id']) - # TODO notify lbagent - h_rt = self.get_health_monitor(context, h['id']) - return h_rt - - def update_health_monitor(self, context, id, health_monitor): - h_query = self.get_health_monitor(context, id, fields=["status"]) - if h_query['status'] in [ - constants.PENDING_DELETE, constants.ERROR]: - raise loadbalancer.StateInvalid(id=id, - state=h_query['status']) - h = super(LoadBalancerPlugin, self).update_health_monitor( - context, id, health_monitor) - self.update_status(context, loadbalancer_db.HealthMonitor, id, - constants.PENDING_UPDATE) - LOG.debug(_("Update health_monitor: %s"), h['id']) - # TODO notify lbagent - h_rt = self.get_health_monitor(context, id) - return h_rt - - def delete_health_monitor(self, context, id): - self.update_status(context, loadbalancer_db.HealthMonitor, id, - constants.PENDING_DELETE) - LOG.debug(_("Delete health_monitor: %s"), id) - super(LoadBalancerPlugin, self).delete_health_monitor(context, id) diff --git a/quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py b/quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py index f866377f50..a59992841a 100644 --- a/quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py +++ b/quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py @@ -34,7 +34,9 @@ import quantum.extensions from quantum.extensions import loadbalancer from quantum.manager import QuantumManager from quantum.plugins.common import constants -from quantum.plugins.services.loadbalancer import loadbalancerPlugin +from quantum.plugins.services.agent_loadbalancer import ( + plugin as loadbalancer_plugin +) from quantum.tests.unit import test_db_plugin from quantum.tests.unit import test_extensions from quantum.tests.unit import testlib_api @@ -46,8 +48,7 @@ LOG = logging.getLogger(__name__) DB_CORE_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2' DB_LB_PLUGIN_KLASS = ( - "quantum.plugins.services.loadbalancer." - "loadbalancerPlugin.LoadBalancerPlugin" + "quantum.plugins.services.agent_loadbalancer.plugin.LoadBalancerPlugin" ) ROOTDIR = os.path.dirname(__file__) + '../../../..' ETCDIR = os.path.join(ROOTDIR, 'etc') @@ -74,7 +75,7 @@ class LoadBalancerPluginDbTestCase(test_db_plugin.QuantumDbPluginV2TestCase): self._subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14" - plugin = loadbalancerPlugin.LoadBalancerPlugin() + plugin = loadbalancer_plugin.LoadBalancerPlugin() ext_mgr = PluginAwareExtensionManager( extensions_path, {constants.LOADBALANCER: plugin} diff --git a/quantum/tests/unit/services/__init__.py b/quantum/tests/unit/services/__init__.py new file mode 100644 index 0000000000..ce18bf6d68 --- /dev/null +++ b/quantum/tests/unit/services/__init__.py @@ -0,0 +1,17 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost diff --git a/quantum/tests/unit/services/agent_loadbalancer/__init__.py b/quantum/tests/unit/services/agent_loadbalancer/__init__.py new file mode 100644 index 0000000000..ce18bf6d68 --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/__init__.py @@ -0,0 +1,17 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost diff --git a/quantum/tests/unit/services/agent_loadbalancer/agent/__init__.py b/quantum/tests/unit/services/agent_loadbalancer/agent/__init__.py new file mode 100644 index 0000000000..ce18bf6d68 --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/agent/__init__.py @@ -0,0 +1,17 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost diff --git a/quantum/tests/unit/services/agent_loadbalancer/agent/test_api.py b/quantum/tests/unit/services/agent_loadbalancer/agent/test_api.py new file mode 100644 index 0000000000..2d5a2ee11f --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/agent/test_api.py @@ -0,0 +1,135 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import mock +import testtools + +from quantum.plugins.services.agent_loadbalancer.agent import api + + +class TestApiCache(testtools.TestCase): + def setUp(self): + super(TestApiCache, self).setUp() + self.addCleanup(mock.patch.stopall) + + self.api = api.LbaasAgentApi('topic', mock.sentinel.context, 'host') + self.make_msg = mock.patch.object(self.api, 'make_msg').start() + self.mock_call = mock.patch.object(self.api, 'call').start() + + def test_init(self): + self.assertEqual(self.api.host, 'host') + self.assertEqual(self.api.context, mock.sentinel.context) + + def test_get_ready_devices(self): + self.assertEqual( + self.api.get_ready_devices(), + self.mock_call.return_value + ) + + self.make_msg.assert_called_once_with('get_ready_devices', host='host') + self.mock_call.assert_called_once_with( + mock.sentinel.context, + self.make_msg.return_value, + topic='topic' + ) + + def test_get_logical_device(self): + self.assertEqual( + self.api.get_logical_device('pool_id'), + self.mock_call.return_value + ) + + self.make_msg.assert_called_once_with( + 'get_logical_device', + pool_id='pool_id', + host='host') + + self.mock_call.assert_called_once_with( + mock.sentinel.context, + self.make_msg.return_value, + topic='topic' + ) + + def test_pool_destroyed(self): + self.assertEqual( + self.api.pool_destroyed('pool_id'), + self.mock_call.return_value + ) + + self.make_msg.assert_called_once_with( + 'pool_destroyed', + pool_id='pool_id', + host='host') + + self.mock_call.assert_called_once_with( + mock.sentinel.context, + self.make_msg.return_value, + topic='topic' + ) + + def test_plug_vip_port(self): + self.assertEqual( + self.api.plug_vip_port('port_id'), + self.mock_call.return_value + ) + + self.make_msg.assert_called_once_with( + 'plug_vip_port', + port_id='port_id', + host='host') + + self.mock_call.assert_called_once_with( + mock.sentinel.context, + self.make_msg.return_value, + topic='topic' + ) + + def test_unplug_vip_port(self): + self.assertEqual( + self.api.unplug_vip_port('port_id'), + self.mock_call.return_value + ) + + self.make_msg.assert_called_once_with( + 'unplug_vip_port', + port_id='port_id', + host='host') + + self.mock_call.assert_called_once_with( + mock.sentinel.context, + self.make_msg.return_value, + topic='topic' + ) + + def test_update_pool_stats(self): + self.assertEqual( + self.api.update_pool_stats('pool_id', {'stat': 'stat'}), + self.mock_call.return_value + ) + + self.make_msg.assert_called_once_with( + 'update_pool_stats', + pool_id='pool_id', + stats={'stat': 'stat'}, + host='host') + + self.mock_call.assert_called_once_with( + mock.sentinel.context, + self.make_msg.return_value, + topic='topic' + ) diff --git a/quantum/tests/unit/services/agent_loadbalancer/agent/test_init.py b/quantum/tests/unit/services/agent_loadbalancer/agent/test_init.py new file mode 100644 index 0000000000..f2e9f22808 --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/agent/test_init.py @@ -0,0 +1,55 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import contextlib +import mock +from oslo.config import cfg +import testtools + +from quantum.plugins.services.agent_loadbalancer import agent + + +class TestLbaasService(testtools.TestCase): + def setUp(self): + super(TestLbaasService, self).setUp() + self.addCleanup(cfg.CONF.reset) + + cfg.CONF.register_opts(agent.OPTS) + + def test_start(self): + with mock.patch.object( + agent.rpc_service.Service, 'start' + ) as mock_start: + + mgr = mock.Mock() + agent_service = agent.LbaasAgentService('host', 'topic', mgr) + agent_service.start() + + self.assertTrue(mock_start.called) + + def test_main(self): + with contextlib.nested( + mock.patch.object(agent.service, 'launch'), + mock.patch.object(agent, 'eventlet'), + mock.patch('sys.argv'), + mock.patch.object(agent.manager, 'LbaasAgentManager') + ) as (mock_launch, mock_eventlet, sys_argv, mgr_cls): + agent.main() + + self.assertTrue(mock_eventlet.monkey_patch.called) + mock_launch.assert_called_once_with(mock.ANY) diff --git a/quantum/tests/unit/services/agent_loadbalancer/agent/test_manager.py b/quantum/tests/unit/services/agent_loadbalancer/agent/test_manager.py new file mode 100644 index 0000000000..b025809c05 --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/agent/test_manager.py @@ -0,0 +1,365 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import contextlib + +import mock +import testtools + +from quantum.plugins.services.agent_loadbalancer.agent import manager + + +class TestLogicalDeviceCache(testtools.TestCase): + def setUp(self): + super(TestLogicalDeviceCache, self).setUp() + self.cache = manager.LogicalDeviceCache() + + def test_put(self): + fake_device = { + 'vip': {'port_id': 'port_id'}, + 'pool': {'id': 'pool_id'} + } + self.cache.put(fake_device) + + self.assertEqual(len(self.cache.devices), 1) + self.assertEqual(len(self.cache.port_lookup), 1) + self.assertEqual(len(self.cache.pool_lookup), 1) + + def test_double_put(self): + fake_device = { + 'vip': {'port_id': 'port_id'}, + 'pool': {'id': 'pool_id'} + } + self.cache.put(fake_device) + self.cache.put(fake_device) + + self.assertEqual(len(self.cache.devices), 1) + self.assertEqual(len(self.cache.port_lookup), 1) + self.assertEqual(len(self.cache.pool_lookup), 1) + + def test_remove_in_cache(self): + fake_device = { + 'vip': {'port_id': 'port_id'}, + 'pool': {'id': 'pool_id'} + } + self.cache.put(fake_device) + + self.assertEqual(len(self.cache.devices), 1) + + self.cache.remove(fake_device) + + self.assertFalse(len(self.cache.devices)) + self.assertFalse(self.cache.port_lookup) + self.assertFalse(self.cache.pool_lookup) + + def test_remove_in_cache_same_object(self): + fake_device = { + 'vip': {'port_id': 'port_id'}, + 'pool': {'id': 'pool_id'} + } + self.cache.put(fake_device) + + self.assertEqual(len(self.cache.devices), 1) + + self.cache.remove(set(self.cache.devices).pop()) + + self.assertFalse(len(self.cache.devices)) + self.assertFalse(self.cache.port_lookup) + self.assertFalse(self.cache.pool_lookup) + + def test_remove_by_pool_id(self): + fake_device = { + 'vip': {'port_id': 'port_id'}, + 'pool': {'id': 'pool_id'} + } + self.cache.put(fake_device) + + self.assertEqual(len(self.cache.devices), 1) + + self.cache.remove_by_pool_id('pool_id') + + self.assertFalse(len(self.cache.devices)) + self.assertFalse(self.cache.port_lookup) + self.assertFalse(self.cache.pool_lookup) + + def test_get_by_pool_id(self): + fake_device = { + 'vip': {'port_id': 'port_id'}, + 'pool': {'id': 'pool_id'} + } + self.cache.put(fake_device) + + dev = self.cache.get_by_pool_id('pool_id') + + self.assertEqual(dev.pool_id, 'pool_id') + self.assertEqual(dev.port_id, 'port_id') + + def test_get_by_port_id(self): + fake_device = { + 'vip': {'port_id': 'port_id'}, + 'pool': {'id': 'pool_id'} + } + self.cache.put(fake_device) + + dev = self.cache.get_by_port_id('port_id') + + self.assertEqual(dev.pool_id, 'pool_id') + self.assertEqual(dev.port_id, 'port_id') + + def test_get_pool_ids(self): + fake_device = { + 'vip': {'port_id': 'port_id'}, + 'pool': {'id': 'pool_id'} + } + self.cache.put(fake_device) + + self.assertEqual(self.cache.get_pool_ids(), ['pool_id']) + + +class TestManager(testtools.TestCase): + def setUp(self): + super(TestManager, self).setUp() + self.addCleanup(mock.patch.stopall) + + mock_conf = mock.Mock() + mock_conf.interface_driver = 'intdriver' + mock_conf.device_driver = 'devdriver' + mock_conf.AGENT.root_helper = 'sudo' + mock_conf.loadbalancer_state_path = '/the/path' + + self.mock_importer = mock.patch.object(manager, 'importutils').start() + + rpc_mock_cls = mock.patch( + 'quantum.plugins.services.agent_loadbalancer.agent.api' + '.LbaasAgentApi' + ).start() + + self.mgr = manager.LbaasAgentManager(mock_conf) + self.rpc_mock = rpc_mock_cls.return_value + self.log = mock.patch.object(manager, 'LOG').start() + self.mgr.needs_resync = False + + def test_initialize_service_hook(self): + with mock.patch.object(self.mgr, 'sync_state') as sync: + self.mgr.initialize_service_hook(mock.Mock()) + sync.assert_called_once_with() + + def test_periodic_resync_needs_sync(self): + with mock.patch.object(self.mgr, 'sync_state') as sync: + self.mgr.needs_resync = True + self.mgr.periodic_resync(mock.Mock()) + sync.assert_called_once_with() + + def test_periodic_resync_no_sync(self): + with mock.patch.object(self.mgr, 'sync_state') as sync: + self.mgr.needs_resync = False + self.mgr.periodic_resync(mock.Mock()) + self.assertFalse(sync.called) + + def test_collect_stats(self): + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_pool_ids.return_value = ['1', '2'] + self.mgr.collect_stats(mock.Mock()) + self.rpc_mock.update_pool_stats.assert_has_calls([ + mock.call('1', mock.ANY), + mock.call('2', mock.ANY) + ]) + + def test_collect_stats_exception(self): + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_pool_ids.return_value = ['1', '2'] + with mock.patch.object(self.mgr, 'driver') as driver: + driver.get_stats.side_effect = Exception + + self.mgr.collect_stats(mock.Mock()) + + self.assertFalse(self.rpc_mock.called) + self.assertTrue(self.mgr.needs_resync) + self.assertTrue(self.log.exception.called) + + def test_vip_plug_callback(self): + self.mgr._vip_plug_callback('plug', {'id': 'id'}) + self.rpc_mock.plug_vip_port.assert_called_once_with('id') + + def test_vip_unplug_callback(self): + self.mgr._vip_plug_callback('unplug', {'id': 'id'}) + self.rpc_mock.unplug_vip_port.assert_called_once_with('id') + + def _sync_state_helper(self, cache, ready, refreshed, destroyed): + with contextlib.nested( + mock.patch.object(self.mgr, 'cache'), + mock.patch.object(self.mgr, 'refresh_device'), + mock.patch.object(self.mgr, 'destroy_device') + ) as (mock_cache, refresh, destroy): + + mock_cache.get_pool_ids.return_value = cache + self.rpc_mock.get_ready_devices.return_value = ready + + self.mgr.sync_state() + + self.assertEqual(len(refreshed), len(refresh.mock_calls)) + self.assertEqual(len(destroyed), len(destroy.mock_calls)) + + refresh.assert_has_calls([mock.call(i) for i in refreshed]) + destroy.assert_has_calls([mock.call(i) for i in destroyed]) + self.assertFalse(self.mgr.needs_resync) + + def test_sync_state_all_known(self): + self._sync_state_helper(['1', '2'], ['1', '2'], ['1', '2'], []) + + def test_sync_state_all_unknown(self): + self._sync_state_helper([], ['1', '2'], ['1', '2'], []) + + def test_sync_state_destroy_all(self): + self._sync_state_helper(['1', '2'], [], [], ['1', '2']) + + def test_sync_state_both(self): + self._sync_state_helper(['1'], ['2'], ['2'], ['1']) + + def test_sync_state_exception(self): + self.rpc_mock.get_ready_devices.side_effect = Exception + + self.mgr.sync_state() + + self.assertTrue(self.log.exception.called) + self.assertTrue(self.mgr.needs_resync) + + def test_refresh_device_exists(self): + config = self.rpc_mock.get_logical_device.return_value + + with mock.patch.object(self.mgr, 'driver') as driver: + with mock.patch.object(self.mgr, 'cache') as cache: + driver.exists.return_value = True + + self.mgr.refresh_device(config) + + driver.exists.assert_called_once_with(config) + driver.update.assert_called_once_with(config) + cache.put.assert_called_once_with(config) + self.assertFalse(self.mgr.needs_resync) + + def test_refresh_device_new(self): + config = self.rpc_mock.get_logical_device.return_value + + with mock.patch.object(self.mgr, 'driver') as driver: + with mock.patch.object(self.mgr, 'cache') as cache: + driver.exists.return_value = False + + self.mgr.refresh_device(config) + + driver.exists.assert_called_once_with(config) + driver.create.assert_called_once_with(config) + cache.put.assert_called_once_with(config) + self.assertFalse(self.mgr.needs_resync) + + def test_refresh_device_exception(self): + config = self.rpc_mock.get_logical_device.return_value + + with mock.patch.object(self.mgr, 'driver') as driver: + with mock.patch.object(self.mgr, 'cache') as cache: + driver.exists.side_effect = Exception + self.mgr.refresh_device(config) + + driver.exists.assert_called_once_with(config) + self.assertTrue(self.mgr.needs_resync) + self.assertTrue(self.log.exception.called) + self.assertFalse(cache.put.called) + + def test_destroy_device_known(self): + with mock.patch.object(self.mgr, 'driver') as driver: + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_by_pool_id.return_value = True + + self.mgr.destroy_device('pool_id') + cache.get_by_pool_id.assert_called_once_with('pool_id') + driver.destroy.assert_called_once_with('pool_id') + self.rpc_mock.pool_destroyed.assert_called_once_with( + 'pool_id' + ) + cache.remove.assert_called_once_with(True) + self.assertFalse(self.mgr.needs_resync) + + def test_destroy_device_unknown(self): + with mock.patch.object(self.mgr, 'driver') as driver: + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_by_pool_id.return_value = None + + self.mgr.destroy_device('pool_id') + cache.get_by_pool_id.assert_called_once_with('pool_id') + self.assertFalse(driver.destroy.called) + + def test_destroy_device_exception(self): + with mock.patch.object(self.mgr, 'driver') as driver: + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_by_pool_id.return_value = True + driver.destroy.side_effect = Exception + + self.mgr.destroy_device('pool_id') + cache.get_by_pool_id.assert_called_once_with('pool_id') + + self.assertTrue(self.log.exception.called) + self.assertTrue(self.mgr.needs_resync) + + def test_remove_orphans(self): + with mock.patch.object(self.mgr, 'driver') as driver: + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_pool_ids.return_value = ['1', '2'] + self.mgr.remove_orphans() + + driver.remove_orphans.assert_called_once_with(['1', '2']) + + def test_reload_pool(self): + with mock.patch.object(self.mgr, 'refresh_device') as refresh: + self.mgr.reload_pool(mock.Mock(), pool_id='pool_id') + refresh.assert_called_once_with('pool_id') + + def test_modify_pool_known(self): + with mock.patch.object(self.mgr, 'refresh_device') as refresh: + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_by_pool_id.return_value = True + + self.mgr.reload_pool(mock.Mock(), pool_id='pool_id') + + refresh.assert_called_once_with('pool_id') + + def test_modify_pool_unknown(self): + with mock.patch.object(self.mgr, 'refresh_device') as refresh: + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_by_pool_id.return_value = False + + self.mgr.modify_pool(mock.Mock(), pool_id='pool_id') + + self.assertFalse(refresh.called) + + def test_destroy_pool_known(self): + with mock.patch.object(self.mgr, 'destroy_device') as destroy: + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_by_pool_id.return_value = True + + self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id') + + destroy.assert_called_once_with('pool_id') + + def test_destroy_pool_unknown(self): + with mock.patch.object(self.mgr, 'destroy_device') as destroy: + with mock.patch.object(self.mgr, 'cache') as cache: + cache.get_by_pool_id.return_value = False + + self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id') + + self.assertFalse(destroy.called) diff --git a/quantum/tests/unit/services/agent_loadbalancer/driver/__init__.py b/quantum/tests/unit/services/agent_loadbalancer/driver/__init__.py new file mode 100644 index 0000000000..ce18bf6d68 --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/driver/__init__.py @@ -0,0 +1,17 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost diff --git a/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/__init__.py b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/__init__.py new file mode 100644 index 0000000000..ce18bf6d68 --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/__init__.py @@ -0,0 +1,17 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost diff --git a/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_namespace_driver.py b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_namespace_driver.py new file mode 100644 index 0000000000..16c7e3593d --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_namespace_driver.py @@ -0,0 +1,131 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import contextlib +import mock +import testtools + +from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import ( + namespace_driver +) + + +class TestHaproxyNSDriver(testtools.TestCase): + def setUp(self): + super(TestHaproxyNSDriver, self).setUp() + + self.vif_driver = mock.Mock() + self.vip_plug_callback = mock.Mock() + + self.driver = namespace_driver.HaproxyNSDriver( + 'sudo', + '/the/path', + self.vif_driver, + self.vip_plug_callback + ) + + self.fake_config = { + 'pool': {'id': 'pool_id'}, + 'vip': {'id': 'vip_id', 'port': {'id': 'port_id'}} + } + + def test_create(self): + with mock.patch.object(self.driver, '_plug') as plug: + with mock.patch.object(self.driver, '_spawn') as spawn: + self.driver.create(self.fake_config) + + plug.assert_called_once_with( + 'qlbaas-pool_id', {'id': 'port_id'} + ) + spawn.assert_called_once_with(self.fake_config) + + def test_update(self): + with contextlib.nested( + mock.patch.object(self.driver, '_get_state_file_path'), + mock.patch.object(self.driver, '_spawn'), + mock.patch('__builtin__.open') + ) as (gsp, spawn, mock_open): + mock_open.return_value = ['5'] + + self.driver.update(self.fake_config) + + mock_open.assert_called_once_with(gsp.return_value, 'r') + spawn.assert_called_once_with(self.fake_config, ['-sf', '5']) + + def test_spawn(self): + with contextlib.nested( + mock.patch.object(namespace_driver.hacfg, 'save_config'), + mock.patch.object(self.driver, '_get_state_file_path'), + mock.patch('quantum.agent.linux.ip_lib.IPWrapper') + ) as (mock_save, gsp, ip_wrap): + gsp.side_effect = lambda x, y: y + + self.driver._spawn(self.fake_config) + + mock_save.assert_called_once_with('conf', self.fake_config, 'sock') + cmd = ['haproxy', '-f', 'conf', '-p', 'pid'] + ip_wrap.assert_has_calls([ + mock.call('sudo', 'qlbaas-pool_id'), + mock.call().netns.execute(cmd) + ]) + + def test_destroy(self): + with contextlib.nested( + mock.patch.object(self.driver, '_get_state_file_path'), + mock.patch.object(namespace_driver, 'kill_pids_in_file'), + mock.patch.object(self.driver, '_unplug'), + mock.patch('quantum.agent.linux.ip_lib.IPWrapper'), + mock.patch('os.path.isdir'), + mock.patch('shutil.rmtree') + ) as (gsp, kill, unplug, ip_wrap, isdir, rmtree): + gsp.side_effect = lambda x, y: '/pool/' + y + + self.driver.pool_to_port_id['pool_id'] = 'port_id' + isdir.return_value = True + + self.driver.destroy('pool_id') + + kill.assert_called_once_with(ip_wrap(), '/pool/pid') + unplug.assert_called_once_with('qlbaas-pool_id', 'port_id') + isdir.called_once_with('/pool') + rmtree.assert_called_once_with('/pool') + ip_wrap.assert_has_calls([ + mock.call('sudo', 'qlbaas-pool_id'), + mock.call().garbage_collect_namespace() + ]) + + def test_exists(self): + with contextlib.nested( + mock.patch.object(self.driver, '_get_state_file_path'), + mock.patch('quantum.agent.linux.ip_lib.IPWrapper'), + mock.patch('socket.socket'), + mock.patch('os.path.exists'), + ) as (gsp, ip_wrap, socket, path_exists): + gsp.side_effect = lambda x, y: '/pool/' + y + + ip_wrap.return_value.netns.exists.return_value = True + path_exists.return_value = True + + self.driver.exists('pool_id') + + ip_wrap.assert_has_calls([ + mock.call('sudo'), + mock.call().netns.exists('qlbaas-pool_id') + ]) + + self.assertTrue(self.driver.exists('pool_id')) diff --git a/quantum/tests/unit/services/agent_loadbalancer/test_plugin.py b/quantum/tests/unit/services/agent_loadbalancer/test_plugin.py new file mode 100644 index 0000000000..a4289ef0cf --- /dev/null +++ b/quantum/tests/unit/services/agent_loadbalancer/test_plugin.py @@ -0,0 +1,263 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import mock +import testtools + +from quantum import context +from quantum import manager +from quantum.plugins.common import constants +from quantum.plugins.services.agent_loadbalancer import plugin +from quantum.tests.unit.db.loadbalancer import test_db_loadbalancer + + +class TestLoadBalancerPluginBase( + test_db_loadbalancer.LoadBalancerPluginDbTestCase): + + def setUp(self): + super(TestLoadBalancerPluginBase, self).setUp() + + # create another API instance to make testing easier + # pass a mock to our API instance + + # we need access to loaded plugins to modify models + loaded_plugins = manager.QuantumManager().get_service_plugins() + self.plugin_instance = loaded_plugins[constants.LOADBALANCER] + self.callbacks = self.plugin_instance.callbacks + + +class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase): + def test_get_ready_devices(self): + with self.vip() as vip: + ready = self.callbacks.get_ready_devices( + context.get_admin_context(), + ) + self.assertEqual(ready, [vip['vip']['pool_id']]) + + def test_get_ready_devices_inactive_vip(self): + with self.vip() as vip: + + # set the vip inactive need to use plugin directly since + # status is not tenant mutable + self.plugin_instance.update_vip( + context.get_admin_context(), + vip['vip']['id'], + {'vip': {'status': constants.INACTIVE}} + ) + + ready = self.callbacks.get_ready_devices( + context.get_admin_context(), + ) + self.assertFalse(ready) + + def test_get_ready_devices_inactive_pool(self): + with self.vip() as vip: + + # set the pool inactive need to use plugin directly since + # status is not tenant mutable + self.plugin_instance.update_pool( + context.get_admin_context(), + vip['vip']['pool_id'], + {'pool': {'status': constants.INACTIVE}} + ) + + ready = self.callbacks.get_ready_devices( + context.get_admin_context(), + ) + self.assertFalse(ready) + + def test_get_logical_device_inactive(self): + with self.pool() as pool: + with self.vip(pool=pool) as vip: + with self.member(pool_id=vip['vip']['pool_id']) as member: + self.assertRaises( + Exception, + self.callbacks.get_logical_device, + context.get_admin_context(), + pool['pool']['id'], + activate=False + ) + + def test_get_logical_device_activate(self): + with self.pool() as pool: + with self.vip(pool=pool) as vip: + with self.member(pool_id=vip['vip']['pool_id']) as member: + ctx = context.get_admin_context() + + # build the expected + port = self.plugin_instance._core_plugin.get_port( + ctx, vip['vip']['port_id'] + ) + subnet = self.plugin_instance._core_plugin.get_subnet( + ctx, vip['vip']['subnet_id'] + ) + port['fixed_ips'][0]['subnet'] = subnet + + # reload pool to add members and vip + pool = self.plugin_instance.get_pool( + ctx, pool['pool']['id'] + ) + + pool['status'] = constants.ACTIVE + vip['vip']['status'] = constants.ACTIVE + vip['vip']['port'] = port + member['member']['status'] = constants.ACTIVE + + expected = { + 'pool': pool, + 'vip': vip['vip'], + 'members': [member['member']], + 'healthmonitors': [] + } + + logical_config = self.callbacks.get_logical_device( + ctx, pool['id'], activate=True + ) + + self.assertEqual(logical_config, expected) + + def _update_port_test_helper(self, expected, func, **kwargs): + core = self.plugin_instance._core_plugin + + with self.pool() as pool: + with self.vip(pool=pool) as vip: + with self.member(pool_id=vip['vip']['pool_id']) as member: + ctx = context.get_admin_context() + func(ctx, port_id=vip['vip']['port_id'], **kwargs) + + db_port = core.get_port(ctx, vip['vip']['port_id']) + + for k, v in expected.iteritems(): + self.assertEqual(db_port[k], v) + + def test_plug_vip_port(self): + exp = { + 'device_owner': 'quantum:' + constants.LOADBALANCER, + 'device_id': 'c596ce11-db30-5c72-8243-15acaae8690f', + 'admin_state_up': True + } + self._update_port_test_helper( + exp, + self.callbacks.plug_vip_port, + host='host' + ) + + def test_unplug_vip_port(self): + exp = { + 'device_owner': '', + 'device_id': '', + 'admin_state_up': False + } + self._update_port_test_helper( + exp, + self.callbacks.unplug_vip_port, + host='host' + ) + + +class TestLoadBalancerAgentApi(testtools.TestCase): + def setUp(self): + super(TestLoadBalancerAgentApi, self).setUp() + self.addCleanup(mock.patch.stopall) + + self.api = plugin.LoadBalancerAgentApi('topic', 'host') + self.mock_cast = mock.patch.object(self.api, 'cast').start() + self.mock_msg = mock.patch.object(self.api, 'make_msg').start() + + def test_init(self): + self.assertEqual(self.api.topic, 'topic') + self.assertEqual(self.api.host, 'host') + + def _call_test_helper(self, method_name): + rv = getattr(self.api, method_name)(mock.sentinel.context, 'the_id') + self.assertEqual(rv, self.mock_cast.return_value) + self.mock_cast.assert_called_once_with( + mock.sentinel.context, + self.mock_msg.return_value, + topic='topic' + ) + + self.mock_msg.assert_called_once_with( + method_name, + pool_id='the_id', + host='host' + ) + + def test_reload_pool(self): + self._call_test_helper('reload_pool') + + def test_destroy_pool(self): + self._call_test_helper('destroy_pool') + + def test_modify_pool(self): + self._call_test_helper('modify_pool') + + +class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): + def setUp(self): + self.log = mock.patch.object(plugin, 'LOG') + api_cls = mock.patch.object(plugin, 'LoadBalancerAgentApi').start() + super(TestLoadBalancerPluginNotificationWrapper, self).setUp() + self.mock_api = api_cls.return_value + + self.addCleanup(mock.patch.stopall) + + def test_create_vip(self): + with self.subnet() as subnet: + with self.pool(subnet=subnet) as pool: + with self.vip(pool=pool, subnet=subnet) as vip: + self.mock_api.reload_pool.assert_called_once_with( + mock.ANY, + vip['vip']['pool_id'] + ) + + def test_update_vip(self): + with self.subnet() as subnet: + with self.pool(subnet=subnet) as pool: + with self.vip(pool=pool, subnet=subnet) as vip: + self.mock_api.reset_mock() + ctx = context.get_admin_context() + vip['vip'].pop('status') + new_vip = self.plugin_instance.update_vip( + ctx, + vip['vip']['id'], + vip + ) + + self.mock_api.reload_pool.assert_called_once_with( + mock.ANY, + vip['vip']['pool_id'] + ) + + self.assertEqual( + new_vip['status'], + constants.PENDING_UPDATE + ) + + def t2est_delete_vip(self): + with self.subnet() as subnet: + with self.pool(subnet=subnet) as pool: + with self.vip(pool=pool, subnet=subnet, no_delete=True) as vip: + self.mock_api.reset_mock() + ctx = context.get_admin_context() + self.plugin_instance.delete_vip(context, vip['vip']['id']) + self.mock_api.destroy_pool.assert_called_once_with( + mock.ANY, + vip['vip']['pool_id'] + ) diff --git a/quantum/tests/unit/test_agent_linux_utils.py b/quantum/tests/unit/test_agent_linux_utils.py index 840d9dcb7c..b1f5c0e7fa 100644 --- a/quantum/tests/unit/test_agent_linux_utils.py +++ b/quantum/tests/unit/test_agent_linux_utils.py @@ -70,3 +70,21 @@ class AgentUtilsGetInterfaceMAC(testtools.TestCase): '\x00' * 232]) actual_val = utils.get_interface_mac('eth0') self.assertEqual(actual_val, expect_val) + + +class AgentUtilsReplaceFile(testtools.TestCase): + def test_replace_file(self): + # make file to replace + with mock.patch('tempfile.NamedTemporaryFile') as ntf: + ntf.return_value.name = '/baz' + with mock.patch('os.chmod') as chmod: + with mock.patch('os.rename') as rename: + utils.replace_file('/foo', 'bar') + + expected = [mock.call('w+', dir='/', delete=False), + mock.call().write('bar'), + mock.call().close()] + + ntf.assert_has_calls(expected) + chmod.assert_called_once_with('/baz', 0644) + rename.assert_called_once_with('/baz', '/foo') diff --git a/quantum/tests/unit/test_linux_dhcp.py b/quantum/tests/unit/test_linux_dhcp.py index c37139a93a..ee9557cff0 100644 --- a/quantum/tests/unit/test_linux_dhcp.py +++ b/quantum/tests/unit/test_linux_dhcp.py @@ -138,22 +138,6 @@ class TestDhcpBase(testtools.TestCase): def test_base_abc_error(self): self.assertRaises(TypeError, dhcp.DhcpBase, None) - def test_replace_file(self): - # make file to replace - with mock.patch('tempfile.NamedTemporaryFile') as ntf: - ntf.return_value.name = '/baz' - with mock.patch('os.chmod') as chmod: - with mock.patch('os.rename') as rename: - dhcp.replace_file('/foo', 'bar') - - expected = [mock.call('w+', dir='/', delete=False), - mock.call().write('bar'), - mock.call().close()] - - ntf.assert_has_calls(expected) - chmod.assert_called_once_with('/baz', 0644) - rename.assert_called_once_with('/baz', '/foo') - def test_restart(self): class SubClass(dhcp.DhcpBase): def __init__(self): @@ -212,7 +196,7 @@ class TestBase(testtools.TestCase): self.conf.set_override('state_path', '') self.conf.use_namespaces = True - self.replace_p = mock.patch('quantum.agent.linux.dhcp.replace_file') + self.replace_p = mock.patch('quantum.agent.linux.utils.replace_file') self.execute_p = mock.patch('quantum.agent.linux.utils.execute') self.addCleanup(self.execute_p.stop) self.safe = self.replace_p.start() @@ -392,7 +376,7 @@ class TestDhcpLocalProcess(TestBase): self.assertEqual(lp.interface_name, 'tap0') def test_set_interface_name(self): - with mock.patch('quantum.agent.linux.dhcp.replace_file') as replace: + with mock.patch('quantum.agent.linux.utils.replace_file') as replace: lp = LocalChild(self.conf, FakeDualNetwork()) with mock.patch.object(lp, 'get_conf_file_name') as conf_file: conf_file.return_value = '/interface' diff --git a/setup.py b/setup.py index e265ab2eca..ac701900a7 100644 --- a/setup.py +++ b/setup.py @@ -137,6 +137,8 @@ else: 'quantum-debug = quantum.debug.shell:main', 'quantum-ovs-cleanup = quantum.agent.ovs_cleanup_util:main', 'quantum-db-manage = quantum.db.migration.cli:main', + ('quantum-lbaas-agent = ' + 'quantum.plugins.services.agent_loadbalancer.agent:main'), ('quantum-check-nvp-config = ' 'quantum.plugins.nicira.nicira_nvp_plugin.check_nvp_config:main'), ]