Merge "LBaaS Agent Reference Implementation"
This commit is contained in:
commit
b5c0ab7935
26
bin/quantum-lbaas-agent
Executable file
26
bin/quantum-lbaas-agent
Executable file
@ -0,0 +1,26 @@
|
||||
#!/usr/bin/env python
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2013 Openstack, LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import sys
|
||||
sys.path.insert(0, os.getcwd())
|
||||
|
||||
from quantum.plugins.services.agent_loadbalancer.agent import main
|
||||
|
||||
|
||||
main()
|
24
etc/lbaas_agent.ini
Normal file
24
etc/lbaas_agent.ini
Normal file
@ -0,0 +1,24 @@
|
||||
[DEFAULT]
|
||||
# Show debugging output in log (sets DEBUG log level output)
|
||||
# debug = true
|
||||
|
||||
# The LBaaS agent will resync its state with Quantum to recover from any
|
||||
# transient notification or rpc errors. The interval is number of
|
||||
# seconds between attempts.
|
||||
# periodic_interval = 10
|
||||
|
||||
# OVS based plugins(OVS, Ryu, NEC, NVP, BigSwitch/Floodlight)
|
||||
interface_driver = quantum.agent.linux.interface.OVSInterfaceDriver
|
||||
# OVS based plugins(Ryu, NEC, NVP, BigSwitch/Floodlight) that use OVS
|
||||
# as OpenFlow switch and check port status
|
||||
# ovs_use_veth = True
|
||||
# LinuxBridge
|
||||
# interface_driver = quantum.agent.linux.interface.BridgeInterfaceDriver
|
||||
|
||||
# The agent requires a driver to manage the loadbalancer. HAProxy is the
|
||||
# opensource version.
|
||||
device_driver = quantum.plugins.services.agent_loadbalancer.drivers.haproxy.namespace_driver.HaproxyNSDriver
|
||||
|
||||
# Allow overlapping IP (Must have kernel build with CONFIG_NET_NS=y and
|
||||
# iproute2 package that supports namespaces).
|
||||
# use_namespaces = True
|
29
etc/quantum/rootwrap.d/lbaas-haproxy.filters
Normal file
29
etc/quantum/rootwrap.d/lbaas-haproxy.filters
Normal file
@ -0,0 +1,29 @@
|
||||
# quantum-rootwrap command filters for nodes on which quantum is
|
||||
# expected to control network
|
||||
#
|
||||
# This file should be owned by (and only-writeable by) the root user
|
||||
|
||||
# format seems to be
|
||||
# cmd-name: filter-name, raw-command, user, args
|
||||
|
||||
[Filters]
|
||||
|
||||
# haproxy
|
||||
haproxy: CommandFilter, /usr/sbin/haproxy, root
|
||||
|
||||
# lbaas-agent uses kill as well, that's handled by the generic KillFilter
|
||||
kill_haproxy_usr: KillFilter, root, /usr/sbin/haproxy, -9, -HUP
|
||||
|
||||
# lbaas-agent uses cat
|
||||
cat: RegExpFilter, /bin/cat, root, cat, /proc/\d+/cmdline
|
||||
|
||||
ovs-vsctl: CommandFilter, /bin/ovs-vsctl, root
|
||||
ovs-vsctl_usr: CommandFilter, /usr/bin/ovs-vsctl, root
|
||||
ovs-vsctl_sbin: CommandFilter, /sbin/ovs-vsctl, root
|
||||
ovs-vsctl_sbin_usr: CommandFilter, /usr/sbin/ovs-vsctl, root
|
||||
|
||||
# ip_lib
|
||||
ip: IpFilter, /sbin/ip, root
|
||||
ip_usr: IpFilter, /usr/sbin/ip, root
|
||||
ip_exec: IpNetnsExecFilter, /sbin/ip, root
|
||||
ip_exec_usr: IpNetnsExecFilter, /usr/sbin/ip, root
|
@ -21,7 +21,6 @@ import re
|
||||
import socket
|
||||
import StringIO
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
import netaddr
|
||||
from oslo.config import cfg
|
||||
@ -187,7 +186,7 @@ class DhcpLocalProcess(DhcpBase):
|
||||
def interface_name(self, value):
|
||||
interface_file_path = self.get_conf_file_name('interface',
|
||||
ensure_conf_dir=True)
|
||||
replace_file(interface_file_path, value)
|
||||
utils.replace_file(interface_file_path, value)
|
||||
|
||||
@abc.abstractmethod
|
||||
def spawn_process(self):
|
||||
@ -298,7 +297,7 @@ class Dnsmasq(DhcpLocalProcess):
|
||||
(port.mac_address, name, alloc.ip_address))
|
||||
|
||||
name = self.get_conf_file_name('host')
|
||||
replace_file(name, buf.getvalue())
|
||||
utils.replace_file(name, buf.getvalue())
|
||||
return name
|
||||
|
||||
def _output_opts_file(self):
|
||||
@ -344,7 +343,7 @@ class Dnsmasq(DhcpLocalProcess):
|
||||
options.append(self._format_option(i, 'router'))
|
||||
|
||||
name = self.get_conf_file_name('opts')
|
||||
replace_file(name, '\n'.join(options))
|
||||
utils.replace_file(name, '\n'.join(options))
|
||||
return name
|
||||
|
||||
def _make_subnet_interface_ip_map(self):
|
||||
@ -402,20 +401,3 @@ class Dnsmasq(DhcpLocalProcess):
|
||||
sock.connect(dhcp_relay_socket)
|
||||
sock.send(jsonutils.dumps(data))
|
||||
sock.close()
|
||||
|
||||
|
||||
def replace_file(file_name, data):
|
||||
"""Replaces the contents of file_name with data in a safe manner.
|
||||
|
||||
First write to a temp file and then rename. Since POSIX renames are
|
||||
atomic, the file is unlikely to be corrupted by competing writes.
|
||||
|
||||
We create the tempfile on the same device to ensure that it can be renamed.
|
||||
"""
|
||||
|
||||
base_dir = os.path.dirname(os.path.abspath(file_name))
|
||||
tmp_file = tempfile.NamedTemporaryFile('w+', dir=base_dir, delete=False)
|
||||
tmp_file.write(data)
|
||||
tmp_file.close()
|
||||
os.chmod(tmp_file.name, 0644)
|
||||
os.rename(tmp_file.name, file_name)
|
||||
|
@ -22,6 +22,7 @@ import os
|
||||
import shlex
|
||||
import socket
|
||||
import struct
|
||||
import tempfile
|
||||
|
||||
from eventlet.green import subprocess
|
||||
|
||||
@ -71,3 +72,20 @@ def get_interface_mac(interface):
|
||||
struct.pack('256s', interface[:DEVICE_NAME_LEN]))
|
||||
return ''.join(['%02x:' % ord(char)
|
||||
for char in info[MAC_START:MAC_END]])[:-1]
|
||||
|
||||
|
||||
def replace_file(file_name, data):
|
||||
"""Replaces the contents of file_name with data in a safe manner.
|
||||
|
||||
First write to a temp file and then rename. Since POSIX renames are
|
||||
atomic, the file is unlikely to be corrupted by competing writes.
|
||||
|
||||
We create the tempfile on the same device to ensure that it can be renamed.
|
||||
"""
|
||||
|
||||
base_dir = os.path.dirname(os.path.abspath(file_name))
|
||||
tmp_file = tempfile.NamedTemporaryFile('w+', dir=base_dir, delete=False)
|
||||
tmp_file.write(data)
|
||||
tmp_file.close()
|
||||
os.chmod(tmp_file.name, 0644)
|
||||
os.rename(tmp_file.name, file_name)
|
||||
|
@ -25,9 +25,11 @@ UPDATE = 'update'
|
||||
AGENT = 'q-agent-notifier'
|
||||
PLUGIN = 'q-plugin'
|
||||
DHCP = 'q-dhcp-notifer'
|
||||
LOADBALANCER_PLUGIN = 'q-loadbalancer-plugin'
|
||||
|
||||
L3_AGENT = 'l3_agent'
|
||||
DHCP_AGENT = 'dhcp_agent'
|
||||
LOADBALANCER_AGENT = 'loadbalancer_agent'
|
||||
|
||||
|
||||
def get_topic_name(prefix, table, operation):
|
||||
|
@ -69,7 +69,7 @@ class Vip(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
|
||||
protocol_port = sa.Column(sa.Integer, nullable=False)
|
||||
protocol = sa.Column(sa.Enum("HTTP", "HTTPS", "TCP", name="lb_protocols"),
|
||||
nullable=False)
|
||||
pool_id = sa.Column(sa.String(36), nullable=False)
|
||||
pool_id = sa.Column(sa.String(36), nullable=False, unique=True)
|
||||
session_persistence = orm.relationship(SessionPersistence,
|
||||
uselist=False,
|
||||
backref="vips",
|
||||
@ -114,6 +114,7 @@ class Pool(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
|
||||
cascade="all, delete-orphan")
|
||||
monitors = orm.relationship("PoolMonitorAssociation", backref="pools",
|
||||
cascade="all, delete-orphan")
|
||||
vip = orm.relationship(Vip, backref='pool')
|
||||
|
||||
|
||||
class HealthMonitor(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
|
||||
@ -239,6 +240,12 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
|
||||
raise
|
||||
return r
|
||||
|
||||
def assert_modification_allowed(self, obj):
|
||||
status = getattr(obj, 'status', None)
|
||||
|
||||
if status == constants.PENDING_DELETE:
|
||||
raise loadbalancer.StateInvalid(id=id, state=status)
|
||||
|
||||
########################################################
|
||||
# VIP DB access
|
||||
def _make_vip_dict(self, vip, fields=None):
|
||||
@ -270,11 +277,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
|
||||
|
||||
return self._fields(res, fields)
|
||||
|
||||
def _update_pool_vip_info(self, context, pool_id, vip_id):
|
||||
pool_db = self._get_resource(context, Pool, pool_id)
|
||||
with context.session.begin(subtransactions=True):
|
||||
pool_db.update({'vip_id': vip_id})
|
||||
|
||||
def _check_session_persistence_info(self, info):
|
||||
""" Performs sanity check on session persistence info.
|
||||
:param info: Session persistence info
|
||||
@ -355,6 +357,14 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
|
||||
tenant_id = self._get_tenant_id_for_create(context, v)
|
||||
|
||||
with context.session.begin(subtransactions=True):
|
||||
# validate that the pool has same tenant
|
||||
if v['pool_id']:
|
||||
pool = self._get_resource(context, Pool, v['pool_id'])
|
||||
if pool['tenant_id'] != tenant_id:
|
||||
raise q_exc.NotAuthorized()
|
||||
else:
|
||||
pool = None
|
||||
|
||||
vip_db = Vip(id=uuidutils.generate_uuid(),
|
||||
tenant_id=tenant_id,
|
||||
name=v['name'],
|
||||
@ -367,16 +377,18 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
|
||||
admin_state_up=v['admin_state_up'],
|
||||
status=constants.PENDING_CREATE)
|
||||
|
||||
vip_id = vip_db['id']
|
||||
session_info = v['session_persistence']
|
||||
|
||||
if session_info:
|
||||
s_p = self._create_session_persistence_db(session_info, vip_id)
|
||||
s_p = self._create_session_persistence_db(
|
||||
session_info,
|
||||
vip_db['id'])
|
||||
vip_db.session_persistence = s_p
|
||||
|
||||
context.session.add(vip_db)
|
||||
context.session.flush()
|
||||
|
||||
# create a port to reserve address for IPAM
|
||||
self._create_port_for_vip(
|
||||
context,
|
||||
vip_db,
|
||||
@ -384,7 +396,9 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
|
||||
v.get('address')
|
||||
)
|
||||
|
||||
self._update_pool_vip_info(context, v['pool_id'], vip_id)
|
||||
if pool:
|
||||
pool['vip_id'] = vip_db['id']
|
||||
|
||||
return self._make_vip_dict(vip_db)
|
||||
|
||||
def update_vip(self, context, id, vip):
|
||||
@ -392,20 +406,36 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
|
||||
|
||||
sess_persist = v.pop('session_persistence', None)
|
||||
with context.session.begin(subtransactions=True):
|
||||
vip_db = self._get_resource(context, Vip, id)
|
||||
|
||||
self.assert_modification_allowed(vip_db)
|
||||
|
||||
if sess_persist:
|
||||
self._update_vip_session_persistence(context, id, sess_persist)
|
||||
else:
|
||||
self._delete_session_persistence(context, id)
|
||||
|
||||
vip_db = self._get_resource(context, Vip, id)
|
||||
old_pool_id = vip_db['pool_id']
|
||||
if v:
|
||||
vip_db.update(v)
|
||||
# If the pool_id is changed, we need to update
|
||||
# the associated pools
|
||||
if 'pool_id' in v:
|
||||
self._update_pool_vip_info(context, old_pool_id, None)
|
||||
self._update_pool_vip_info(context, v['pool_id'], id)
|
||||
new_pool = self._get_resource(context, Pool, v['pool_id'])
|
||||
self.assert_modification_allowed(new_pool)
|
||||
|
||||
# check that the pool matches the tenant_id
|
||||
if new_pool['tenant_id'] != vip_db['tenant_id']:
|
||||
raise q_exc.NotAuthorized()
|
||||
|
||||
if vip_db['pool_id']:
|
||||
old_pool = self._get_resource(
|
||||
context,
|
||||
Pool,
|
||||
vip_db['pool_id']
|
||||
)
|
||||
old_pool['vip_id'] = None
|
||||
|
||||
new_pool['vip_id'] = vip_db['id']
|
||||
|
||||
return self._make_vip_dict(vip_db)
|
||||
|
||||
@ -432,7 +462,7 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
|
||||
|
||||
########################################################
|
||||
# Pool DB access
|
||||
def _make_pool_dict(self, context, pool, fields=None):
|
||||
def _make_pool_dict(self, pool, fields=None):
|
||||
res = {'id': pool['id'],
|
||||
'tenant_id': pool['tenant_id'],
|
||||
'name': pool['name'],
|
||||
@ -453,16 +483,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
|
||||
|
||||
return self._fields(res, fields)
|
||||
|
||||
def _update_pool_member_info(self, context, pool_id, membersInfo):
|
||||
with context.session.begin(subtransactions=True):
|
||||
member_qry = context.session.query(Member)
|
||||
for member_id in membersInfo:
|
||||
try:
|
||||
member = member_qry.filter_by(id=member_id).one()
|
||||
member.update({'pool_id': pool_id})
|
||||
except exc.NoResultFound:
|
||||
raise loadbalancer.MemberNotFound(member_id=member_id)
|
||||
|
||||
def _create_pool_stats(self, context, pool_id):
|
||||
# This is internal method to add pool statistics. It won't
|
||||
# be exposed to API
|
||||
@ -504,17 +524,17 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
|
||||
context.session.add(pool_db)
|
||||
|
||||
pool_db = self._get_resource(context, Pool, pool_db['id'])
|
||||
return self._make_pool_dict(context, pool_db)
|
||||
return self._make_pool_dict(pool_db)
|
||||
|
||||
def update_pool(self, context, id, pool):
|
||||
v = pool['pool']
|
||||
p = pool['pool']
|
||||
|
||||
with context.session.begin(subtransactions=True):
|
||||
pool_db = self._get_resource(context, Pool, id)
|
||||
if v:
|
||||
pool_db.update(v)
|
||||
if p:
|
||||
pool_db.update(p)
|
||||
|
||||
return self._make_pool_dict(context, pool_db)
|
||||
return self._make_pool_dict(pool_db)
|
||||
|
||||
def delete_pool(self, context, id):
|
||||
# Check if the pool is in use
|
||||
@ -529,15 +549,15 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
|
||||
|
||||
def get_pool(self, context, id, fields=None):
|
||||
pool = self._get_resource(context, Pool, id)
|
||||
return self._make_pool_dict(context, pool, fields)
|
||||
return self._make_pool_dict(pool, fields)
|
||||
|
||||
def get_pools(self, context, filters=None, fields=None):
|
||||
collection = self._model_query(context, Pool)
|
||||
collection = self._apply_filters_to_query(collection, Pool, filters)
|
||||
return [self._make_pool_dict(context, c, fields)
|
||||
return [self._make_pool_dict(c, fields)
|
||||
for c in collection.all()]
|
||||
|
||||
def get_stats(self, context, pool_id):
|
||||
def stats(self, context, pool_id):
|
||||
with context.session.begin(subtransactions=True):
|
||||
pool_qry = context.session.query(Pool)
|
||||
try:
|
||||
@ -600,6 +620,7 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
|
||||
raise loadbalancer.HealthMonitorNotFound(monitor_id=id)
|
||||
|
||||
def get_pool_health_monitor(self, context, id, pool_id, fields=None):
|
||||
# TODO(markmcclain) look into why pool_id is ignored
|
||||
healthmonitor = self._get_resource(context, HealthMonitor, id)
|
||||
return self._make_health_monitor_dict(healthmonitor, fields)
|
||||
|
||||
@ -644,7 +665,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
|
||||
v = member['member']
|
||||
with context.session.begin(subtransactions=True):
|
||||
member_db = self._get_resource(context, Member, id)
|
||||
old_pool_id = member_db['pool_id']
|
||||
if v:
|
||||
member_db.update(v)
|
||||
|
||||
|
@ -58,6 +58,7 @@ def upgrade(active_plugin=None, options=None):
|
||||
sa.Column(u'admin_state_up', sa.Boolean(), nullable=False),
|
||||
sa.Column(u'connection_limit', sa.Integer(), nullable=True),
|
||||
sa.ForeignKeyConstraint(['port_id'], ['ports.id'], ),
|
||||
sa.UniqueConstraint('pool_id'),
|
||||
sa.PrimaryKeyConstraint(u'id')
|
||||
)
|
||||
op.create_table(
|
||||
|
@ -68,6 +68,7 @@ RESOURCE_ATTRIBUTE_MAP = {
|
||||
'is_visible': True},
|
||||
'name': {'allow_post': True, 'allow_put': True,
|
||||
'validate': {'type:string': None},
|
||||
'default': '',
|
||||
'is_visible': True},
|
||||
'description': {'allow_post': True, 'allow_put': True,
|
||||
'validate': {'type:string': None},
|
||||
@ -128,6 +129,7 @@ RESOURCE_ATTRIBUTE_MAP = {
|
||||
'is_visible': True},
|
||||
'name': {'allow_post': True, 'allow_put': True,
|
||||
'validate': {'type:string': None},
|
||||
'default': '',
|
||||
'is_visible': True},
|
||||
'description': {'allow_post': True, 'allow_put': True,
|
||||
'validate': {'type:string': None},
|
||||
|
@ -0,0 +1,67 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
||||
|
||||
import eventlet
|
||||
from oslo.config import cfg
|
||||
|
||||
from quantum.agent.common import config
|
||||
from quantum.agent.linux import interface
|
||||
from quantum.common import topics
|
||||
from quantum.openstack.common.rpc import service as rpc_service
|
||||
from quantum.openstack.common import service
|
||||
from quantum.plugins.services.agent_loadbalancer.agent import manager
|
||||
|
||||
|
||||
OPTS = [
|
||||
cfg.IntOpt(
|
||||
'periodic_interval',
|
||||
default=10,
|
||||
help=_('Seconds between periodic task runs')
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
class LbaasAgentService(rpc_service.Service):
|
||||
def start(self):
|
||||
super(LbaasAgentService, self).start()
|
||||
self.tg.add_timer(
|
||||
cfg.CONF.periodic_interval,
|
||||
self.manager.run_periodic_tasks,
|
||||
None,
|
||||
None
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
eventlet.monkey_patch()
|
||||
cfg.CONF.register_opts(OPTS)
|
||||
cfg.CONF.register_opts(manager.OPTS)
|
||||
# import interface options just in case the driver uses namespaces
|
||||
cfg.CONF.register_opts(interface.OPTS)
|
||||
config.register_root_helper(cfg.CONF)
|
||||
|
||||
cfg.CONF(project='quantum')
|
||||
config.setup_logging(cfg.CONF)
|
||||
|
||||
mgr = manager.LbaasAgentManager(cfg.CONF)
|
||||
svc = LbaasAgentService(
|
||||
host=cfg.CONF.host,
|
||||
topic=topics.LOADBALANCER_AGENT,
|
||||
manager=mgr
|
||||
)
|
||||
service.launch(svc).wait()
|
81
quantum/plugins/services/agent_loadbalancer/agent/api.py
Normal file
81
quantum/plugins/services/agent_loadbalancer/agent/api.py
Normal file
@ -0,0 +1,81 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
||||
|
||||
from quantum.openstack.common.rpc import proxy
|
||||
|
||||
|
||||
class LbaasAgentApi(proxy.RpcProxy):
|
||||
"""Agent side of the Agent to Plugin RPC API."""
|
||||
|
||||
API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, topic, context, host):
|
||||
super(LbaasAgentApi, self).__init__(topic, self.API_VERSION)
|
||||
self.context = context
|
||||
self.host = host
|
||||
|
||||
def get_ready_devices(self):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('get_ready_devices', host=self.host),
|
||||
topic=self.topic
|
||||
)
|
||||
|
||||
def get_logical_device(self, pool_id):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg(
|
||||
'get_logical_device',
|
||||
pool_id=pool_id,
|
||||
host=self.host
|
||||
),
|
||||
topic=self.topic
|
||||
)
|
||||
|
||||
def pool_destroyed(self, pool_id):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('pool_destroyed', pool_id=pool_id, host=self.host),
|
||||
topic=self.topic
|
||||
)
|
||||
|
||||
def plug_vip_port(self, port_id):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('plug_vip_port', port_id=port_id, host=self.host),
|
||||
topic=self.topic
|
||||
)
|
||||
|
||||
def unplug_vip_port(self, port_id):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('unplug_vip_port', port_id=port_id, host=self.host),
|
||||
topic=self.topic
|
||||
)
|
||||
|
||||
def update_pool_stats(self, pool_id, stats):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg(
|
||||
'update_pool_stats',
|
||||
pool_id=pool_id,
|
||||
stats=stats,
|
||||
host=self.host
|
||||
),
|
||||
topic=self.topic
|
||||
)
|
221
quantum/plugins/services/agent_loadbalancer/agent/manager.py
Normal file
221
quantum/plugins/services/agent_loadbalancer/agent/manager.py
Normal file
@ -0,0 +1,221 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
||||
|
||||
import weakref
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from quantum.agent.common import config
|
||||
from quantum.common import topics
|
||||
from quantum import context
|
||||
from quantum.openstack.common import importutils
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common import periodic_task
|
||||
from quantum.plugins.services.agent_loadbalancer.agent import api
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
NS_PREFIX = 'qlbaas-'
|
||||
|
||||
OPTS = [
|
||||
cfg.StrOpt(
|
||||
'device_driver',
|
||||
help=_('The driver used to manage the loadbalancing device'),
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'loadbalancer_state_path',
|
||||
default='$state_path/lbaas',
|
||||
help=_('Location to store config and state files'),
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'interface_driver',
|
||||
help=_('The driver used to manage the virtual interface')
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
class LogicalDeviceCache(object):
|
||||
"""Manage a cache of known devices."""
|
||||
|
||||
class Device(object):
|
||||
"""Inner classes used to hold values for weakref lookups"""
|
||||
def __init__(self, port_id, pool_id):
|
||||
self.port_id = port_id
|
||||
self.pool_id = pool_id
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.__dict__ == other.__dict__
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.port_id, self.pool_id))
|
||||
|
||||
def __init__(self):
|
||||
self.devices = set()
|
||||
self.port_lookup = weakref.WeakValueDictionary()
|
||||
self.pool_lookup = weakref.WeakValueDictionary()
|
||||
|
||||
def put(self, device):
|
||||
port_id = device['vip']['port_id']
|
||||
pool_id = device['pool']['id']
|
||||
d = self.Device(device['vip']['port_id'], device['pool']['id'])
|
||||
if d not in self.devices:
|
||||
self.devices.add(d)
|
||||
self.port_lookup[port_id] = d
|
||||
self.pool_lookup[pool_id] = d
|
||||
|
||||
def remove(self, device):
|
||||
if not isinstance(device, self.Device):
|
||||
device = self.Device(
|
||||
device['vip']['port_id'], device['pool']['id']
|
||||
)
|
||||
if device in self.devices:
|
||||
self.devices.remove(device)
|
||||
|
||||
def remove_by_pool_id(self, pool_id):
|
||||
d = self.pool_lookup.get(pool_id)
|
||||
if d:
|
||||
self.devices.remove(d)
|
||||
|
||||
def get_by_pool_id(self, pool_id):
|
||||
return self.pool_lookup.get(pool_id)
|
||||
|
||||
def get_by_port_id(self, port_id):
|
||||
return self.port_lookup.get(port_id)
|
||||
|
||||
def get_pool_ids(self):
|
||||
return self.pool_lookup.keys()
|
||||
|
||||
|
||||
class LbaasAgentManager(periodic_task.PeriodicTasks):
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
try:
|
||||
vif_driver = importutils.import_object(conf.interface_driver, conf)
|
||||
except ImportError:
|
||||
# the driver is optional
|
||||
msg = _('Error importing interface driver: %s')
|
||||
raise SystemExit(msg % conf.interface_driver)
|
||||
vif_driver = None
|
||||
|
||||
try:
|
||||
self.driver = importutils.import_object(
|
||||
conf.device_driver,
|
||||
config.get_root_helper(self.conf),
|
||||
conf.loadbalancer_state_path,
|
||||
vif_driver,
|
||||
self._vip_plug_callback
|
||||
)
|
||||
except ImportError:
|
||||
msg = _('Error importing loadbalancer device driver: %s')
|
||||
raise SystemExit(msg % conf.device_driver)
|
||||
ctx = context.get_admin_context_without_session()
|
||||
self.plugin_rpc = api.LbaasAgentApi(
|
||||
topics.LOADBALANCER_PLUGIN,
|
||||
ctx,
|
||||
conf.host
|
||||
)
|
||||
self.needs_resync = False
|
||||
self.cache = LogicalDeviceCache()
|
||||
|
||||
def initialize_service_hook(self, started_by):
|
||||
self.sync_state()
|
||||
|
||||
@periodic_task.periodic_task
|
||||
def periodic_resync(self, context):
|
||||
if self.needs_resync:
|
||||
self.needs_resync = False
|
||||
self.sync_state()
|
||||
|
||||
@periodic_task.periodic_task(ticks_between_runs=6)
|
||||
def collect_stats(self, context):
|
||||
for pool_id in self.cache.get_pool_ids():
|
||||
try:
|
||||
stats = self.driver.get_stats(pool_id)
|
||||
if stats:
|
||||
self.plugin_rpc.update_pool_stats(pool_id, stats)
|
||||
except Exception:
|
||||
LOG.exception(_('Error upating stats'))
|
||||
self.needs_resync = True
|
||||
|
||||
def _vip_plug_callback(self, action, port):
|
||||
if action == 'plug':
|
||||
self.plugin_rpc.plug_vip_port(port['id'])
|
||||
elif action == 'unplug':
|
||||
self.plugin_rpc.unplug_vip_port(port['id'])
|
||||
|
||||
def sync_state(self):
|
||||
known_devices = set(self.cache.get_pool_ids())
|
||||
try:
|
||||
ready_logical_devices = set(self.plugin_rpc.get_ready_devices())
|
||||
|
||||
for deleted_id in known_devices - ready_logical_devices:
|
||||
self.destroy_device(deleted_id)
|
||||
|
||||
for pool_id in ready_logical_devices:
|
||||
self.refresh_device(pool_id)
|
||||
|
||||
except Exception:
|
||||
LOG.exception(_('Unable to retrieve ready devices'))
|
||||
self.needs_resync = True
|
||||
|
||||
self.remove_orphans()
|
||||
|
||||
def refresh_device(self, pool_id):
|
||||
try:
|
||||
logical_config = self.plugin_rpc.get_logical_device(pool_id)
|
||||
|
||||
if self.driver.exists(pool_id):
|
||||
self.driver.update(logical_config)
|
||||
else:
|
||||
self.driver.create(logical_config)
|
||||
self.cache.put(logical_config)
|
||||
except Exception:
|
||||
LOG.exception(_('Unable to refresh device for pool: %s'), pool_id)
|
||||
self.needs_resync = True
|
||||
|
||||
def destroy_device(self, pool_id):
|
||||
device = self.cache.get_by_pool_id(pool_id)
|
||||
if not device:
|
||||
return
|
||||
try:
|
||||
self.driver.destroy(pool_id)
|
||||
self.plugin_rpc.pool_destroyed(pool_id)
|
||||
except Exception:
|
||||
LOG.exception(_('Unable to destroy device for pool: %s'), pool_id)
|
||||
self.needs_resync = True
|
||||
self.cache.remove(device)
|
||||
|
||||
def remove_orphans(self):
|
||||
try:
|
||||
self.driver.remove_orphans(self.cache.get_pool_ids())
|
||||
except NotImplementedError:
|
||||
pass # Not all drivers will support this
|
||||
|
||||
def reload_pool(self, context, pool_id=None, host=None):
|
||||
"""Handle RPC cast from plugin to reload a pool."""
|
||||
if pool_id:
|
||||
self.refresh_device(pool_id)
|
||||
|
||||
def modify_pool(self, context, pool_id=None, host=None):
|
||||
"""Handle RPC cast from plugin to modify a pool if known to agent."""
|
||||
if self.cache.get_by_pool_id(pool_id):
|
||||
self.refresh_device(pool_id)
|
||||
|
||||
def destroy_pool(self, context, pool_id=None, host=None):
|
||||
"""Handle RPC cast from plugin to destroy a pool if known to agent."""
|
||||
if self.cache.get_by_pool_id(pool_id):
|
||||
self.destroy_device(pool_id)
|
33
quantum/plugins/services/agent_loadbalancer/constants.py
Normal file
33
quantum/plugins/services/agent_loadbalancer/constants.py
Normal file
@ -0,0 +1,33 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Mirantis, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
LB_METHOD_ROUND_ROBIN = 'ROUND_ROBIN'
|
||||
LB_METHOD_LEAST_CONNECTIONS = 'LEAST_CONNECTIONS'
|
||||
LB_METHOD_SOURCE_IP = 'SOURCE_IP'
|
||||
|
||||
PROTOCOL_TCP = 'TCP'
|
||||
PROTOCOL_HTTP = 'HTTP'
|
||||
PROTOCOL_HTTPS = 'HTTPS'
|
||||
|
||||
HEALTH_MONITOR_PING = 'PING'
|
||||
HEALTH_MONITOR_TCP = 'TCP'
|
||||
HEALTH_MONITOR_HTTP = 'HTTP'
|
||||
HEALTH_MONITOR_HTTPS = 'HTTPS'
|
||||
|
||||
SESSION_PERSISTENCE_SOURCE_IP = 'SOURCE_IP'
|
||||
SESSION_PERSISTENCE_HTTP_COOKIE = 'HTTP_COOKIE'
|
||||
SESSION_PERSISTENCE_APP_COOKIE = 'APP_COOKIE'
|
@ -0,0 +1,17 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
@ -0,0 +1,17 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
@ -0,0 +1,184 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
||||
|
||||
import itertools
|
||||
|
||||
from quantum.agent.linux import utils
|
||||
from quantum.plugins.common import constants as qconstants
|
||||
from quantum.plugins.services.agent_loadbalancer import constants
|
||||
|
||||
|
||||
PROTOCOL_MAP = {
|
||||
constants.PROTOCOL_TCP: 'tcp',
|
||||
constants.PROTOCOL_HTTP: 'http',
|
||||
constants.PROTOCOL_HTTPS: 'tcp',
|
||||
}
|
||||
|
||||
BALANCE_MAP = {
|
||||
constants.LB_METHOD_ROUND_ROBIN: 'roundrobin',
|
||||
constants.LB_METHOD_LEAST_CONNECTIONS: 'leastconn',
|
||||
constants.LB_METHOD_SOURCE_IP: 'source'
|
||||
}
|
||||
|
||||
ACTIVE = qconstants.ACTIVE
|
||||
|
||||
|
||||
def save_config(conf_path, logical_config, socket_path=None):
|
||||
"""Convert a logical configuration to the HAProxy version"""
|
||||
data = []
|
||||
data.extend(_build_global(logical_config, socket_path=socket_path))
|
||||
data.extend(_build_defaults(logical_config))
|
||||
data.extend(_build_frontend(logical_config))
|
||||
data.extend(_build_backend(logical_config))
|
||||
utils.replace_file(conf_path, '\n'.join(data))
|
||||
|
||||
|
||||
def _build_global(config, socket_path=None):
|
||||
opts = [
|
||||
'daemon',
|
||||
'user nobody',
|
||||
'group nogroup',
|
||||
'log /dev/log local0',
|
||||
'log /dev/log local1 notice'
|
||||
]
|
||||
|
||||
if socket_path:
|
||||
opts.append('stats socket %s mode 0666 level user' % socket_path)
|
||||
|
||||
return itertools.chain(['global'], ('\t' + o for o in opts))
|
||||
|
||||
|
||||
def _build_defaults(config):
|
||||
opts = [
|
||||
'log global',
|
||||
'retries 3',
|
||||
'option redispatch',
|
||||
'timeout connect 5000',
|
||||
'timeout client 50000',
|
||||
'timeout server 50000',
|
||||
]
|
||||
|
||||
return itertools.chain(['defaults'], ('\t' + o for o in opts))
|
||||
|
||||
|
||||
def _build_frontend(config):
|
||||
protocol = config['vip']['protocol']
|
||||
|
||||
opts = [
|
||||
'option tcplog',
|
||||
'bind %s:%d' % (
|
||||
_get_first_ip_from_port(config['vip']['port']),
|
||||
config['vip']['protocol_port']
|
||||
),
|
||||
'mode %s' % PROTOCOL_MAP[protocol],
|
||||
'default_backend %s' % config['pool']['id'],
|
||||
]
|
||||
|
||||
if config['vip']['connection_limit'] >= 0:
|
||||
opts.append('maxconn %s' % config['vip']['connection_limit'])
|
||||
|
||||
if protocol == constants.PROTOCOL_HTTP:
|
||||
opts.append('option forwardfor')
|
||||
|
||||
return itertools.chain(
|
||||
['frontend %s' % config['vip']['id']],
|
||||
('\t' + o for o in opts)
|
||||
)
|
||||
|
||||
|
||||
def _build_backend(config):
|
||||
protocol = config['pool']['protocol']
|
||||
lb_method = config['pool']['lb_method']
|
||||
|
||||
opts = [
|
||||
'mode %s' % PROTOCOL_MAP[protocol],
|
||||
'balance %s' % BALANCE_MAP.get(lb_method, 'roundrobin')
|
||||
]
|
||||
|
||||
if protocol == constants.PROTOCOL_HTTP:
|
||||
opts.append('option forwardfor')
|
||||
|
||||
# add the first health_monitor (if available)
|
||||
server_addon, health_opts = _get_server_health_option(config)
|
||||
opts.extend(health_opts)
|
||||
|
||||
# add the members
|
||||
opts.extend(
|
||||
(('server %(id)s %(address)s:%(protocol_port)s '
|
||||
'weight %(weight)s') % member) + server_addon
|
||||
for member in config['members']
|
||||
if (member['status'] == ACTIVE and member['admin_state_up'])
|
||||
)
|
||||
|
||||
return itertools.chain(
|
||||
['backend %s' % config['pool']['id']],
|
||||
('\t' + o for o in opts)
|
||||
)
|
||||
|
||||
|
||||
def _get_first_ip_from_port(port):
|
||||
for fixed_ip in port['fixed_ips']:
|
||||
return fixed_ip['ip_address']
|
||||
|
||||
|
||||
def _get_server_health_option(config):
|
||||
"""return the first active health option"""
|
||||
for monitor in config['healthmonitors']:
|
||||
if monitor['status'] == ACTIVE and monitor['admin_state_up']:
|
||||
break
|
||||
else:
|
||||
return '', []
|
||||
|
||||
server_addon = ' check inter %(delay)ds fall %(max_retries)d' % monitor
|
||||
opts = [
|
||||
'timeout check %ds' % monitor['timeout']
|
||||
]
|
||||
|
||||
if monitor['type'] in (constants.HEALTH_MONITOR_HTTP,
|
||||
constants.HEALTH_MONITOR_HTTPS):
|
||||
opts.append('option httpchk %(http_method)s %(url_path)s' % monitor)
|
||||
opts.append(
|
||||
'http-check expect rstatus %s' %
|
||||
'|'.join(_expand_expected_codes(monitor['expected_codes']))
|
||||
)
|
||||
|
||||
if monitor['type'] == constants.HEALTH_MONITOR_HTTPS:
|
||||
opts.append('option ssl-hello-chk')
|
||||
|
||||
return server_addon, opts
|
||||
|
||||
|
||||
def _expand_expected_codes(codes):
|
||||
"""Expand the expected code string in set of codes.
|
||||
|
||||
200-204 -> 200, 201, 202, 204
|
||||
200, 203 -> 200, 203
|
||||
"""
|
||||
|
||||
retval = set()
|
||||
for code in codes.replace(',', ' ').split(' '):
|
||||
code = code.strip()
|
||||
|
||||
if not code:
|
||||
continue
|
||||
elif '-' in code:
|
||||
low, hi = code.split('-')[:2]
|
||||
retval.update(str(i) for i in xrange(int(low), int(hi)))
|
||||
else:
|
||||
retval.add(code)
|
||||
return retval
|
@ -0,0 +1,182 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
||||
import os
|
||||
import shutil
|
||||
import socket
|
||||
|
||||
import netaddr
|
||||
|
||||
from quantum.agent.linux import ip_lib
|
||||
from quantum.common import exceptions
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
|
||||
cfg as hacfg
|
||||
)
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
NS_PREFIX = 'qlbaas-'
|
||||
|
||||
|
||||
class HaproxyNSDriver(object):
|
||||
def __init__(self, root_helper, state_path, vif_driver, vip_plug_callback):
|
||||
self.root_helper = root_helper
|
||||
self.state_path = state_path
|
||||
self.vif_driver = vif_driver
|
||||
self.vip_plug_callback = vip_plug_callback
|
||||
self.pool_to_port_id = {}
|
||||
|
||||
def create(self, logical_config):
|
||||
pool_id = logical_config['pool']['id']
|
||||
namespace = get_ns_name(pool_id)
|
||||
|
||||
self._plug(namespace, logical_config['vip']['port'])
|
||||
self._spawn(logical_config)
|
||||
|
||||
def update(self, logical_config):
|
||||
pool_id = logical_config['pool']['id']
|
||||
pid_path = self._get_state_file_path(pool_id, 'pid')
|
||||
|
||||
extra_args = ['-sf']
|
||||
extra_args.extend(p.strip() for p in open(pid_path, 'r'))
|
||||
self._spawn(logical_config, extra_args)
|
||||
|
||||
def _spawn(self, logical_config, extra_cmd_args=()):
|
||||
pool_id = logical_config['pool']['id']
|
||||
namespace = get_ns_name(pool_id)
|
||||
conf_path = self._get_state_file_path(pool_id, 'conf')
|
||||
pid_path = self._get_state_file_path(pool_id, 'pid')
|
||||
sock_path = self._get_state_file_path(pool_id, 'sock')
|
||||
|
||||
hacfg.save_config(conf_path, logical_config, sock_path)
|
||||
cmd = ['haproxy', '-f', conf_path, '-p', pid_path]
|
||||
cmd.extend(extra_cmd_args)
|
||||
|
||||
ns = ip_lib.IPWrapper(self.root_helper, namespace)
|
||||
ns.netns.execute(cmd)
|
||||
|
||||
# remember the pool<>port mapping
|
||||
self.pool_to_port_id[pool_id] = logical_config['vip']['port']['id']
|
||||
|
||||
def destroy(self, pool_id):
|
||||
namespace = get_ns_name(pool_id)
|
||||
ns = ip_lib.IPWrapper(self.root_helper, namespace)
|
||||
pid_path = self._get_state_file_path(pool_id, 'pid')
|
||||
sock_path = self._get_state_file_path(pool_id, 'sock')
|
||||
|
||||
# kill the process
|
||||
kill_pids_in_file(ns, pid_path)
|
||||
|
||||
# unplug the ports
|
||||
if pool_id in self.pool_to_port_id:
|
||||
self._unplug(namespace, self.pool_to_port_id[pool_id])
|
||||
|
||||
# remove the configuration directory
|
||||
conf_dir = os.path.dirname(self._get_state_file_path(pool_id, ''))
|
||||
if os.path.isdir(conf_dir):
|
||||
shutil.rmtree(conf_dir)
|
||||
ns.garbage_collect_namespace()
|
||||
|
||||
def exists(self, pool_id):
|
||||
namespace = get_ns_name(pool_id)
|
||||
root_ns = ip_lib.IPWrapper(self.root_helper)
|
||||
|
||||
socket_path = self._get_state_file_path(pool_id, 'sock')
|
||||
if root_ns.netns.exists(namespace) and os.path.exists(socket_path):
|
||||
try:
|
||||
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
s.connect(socket_path)
|
||||
return True
|
||||
except socket.error:
|
||||
pass
|
||||
return False
|
||||
|
||||
def get_stats(self, pool_id):
|
||||
pass
|
||||
|
||||
def remove_orphans(self, known_pool_ids):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _get_state_file_path(self, pool_id, kind, ensure_state_dir=True):
|
||||
"""Returns the file name for a given kind of config file."""
|
||||
confs_dir = os.path.abspath(os.path.normpath(self.state_path))
|
||||
conf_dir = os.path.join(confs_dir, pool_id)
|
||||
if ensure_state_dir:
|
||||
if not os.path.isdir(conf_dir):
|
||||
os.makedirs(conf_dir, 0755)
|
||||
return os.path.join(conf_dir, kind)
|
||||
|
||||
def _plug(self, namespace, port, reuse_existing=True):
|
||||
self.vip_plug_callback('plug', port)
|
||||
interface_name = self.vif_driver.get_device_name(Wrap(port))
|
||||
|
||||
if ip_lib.device_exists(interface_name, self.root_helper, namespace):
|
||||
if not reuse_existing:
|
||||
raise exceptions.PreexistingDeviceFailure(
|
||||
dev_name=interface_name
|
||||
)
|
||||
else:
|
||||
self.vif_driver.plug(
|
||||
port['network_id'],
|
||||
port['id'],
|
||||
interface_name,
|
||||
port['mac_address'],
|
||||
namespace=namespace
|
||||
)
|
||||
|
||||
cidrs = [
|
||||
'%s/%s' % (ip['ip_address'],
|
||||
netaddr.IPNetwork(ip['subnet']['cidr']).prefixlen)
|
||||
for ip in port['fixed_ips']
|
||||
]
|
||||
self.vif_driver.init_l3(interface_name, cidrs, namespace=namespace)
|
||||
|
||||
def _unplug(self, namespace, port_id):
|
||||
port_stub = {'id': port_id}
|
||||
self.vip_plug_callback('unplug', port_stub)
|
||||
interface_name = self.vif_driver.get_device_name(Wrap(port_stub))
|
||||
self.vif_driver.unplug(interface_name, namespace=namespace)
|
||||
|
||||
|
||||
# NOTE (markmcclain) For compliance with interface.py which expects objects
|
||||
class Wrap(object):
|
||||
"""A light attribute wrapper for compatibility with the interface lib."""
|
||||
def __init__(self, d):
|
||||
self.__dict__.update(d)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.__dict__[key]
|
||||
|
||||
|
||||
def get_ns_name(namespace_id):
|
||||
return NS_PREFIX + namespace_id
|
||||
|
||||
|
||||
def kill_pids_in_file(namespace_wrapper, pid_path):
|
||||
if os.path.exists(pid_path):
|
||||
with open(pid_path, 'r') as pids:
|
||||
for pid in pids:
|
||||
pid = pid.strip()
|
||||
try:
|
||||
namespace_wrapper.netns.execute(
|
||||
['kill', '-9', pid.strip()]
|
||||
)
|
||||
except RuntimeError:
|
||||
LOG.exception(
|
||||
_('Unable to kill haproxy process: %s'),
|
||||
pid
|
||||
)
|
338
quantum/plugins/services/agent_loadbalancer/plugin.py
Normal file
338
quantum/plugins/services/agent_loadbalancer/plugin.py
Normal file
@ -0,0 +1,338 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import uuid
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from quantum.common import exceptions as q_exc
|
||||
from quantum.common import rpc as q_rpc
|
||||
from quantum.common import topics
|
||||
from quantum.db import api as qdbapi
|
||||
from quantum.db.loadbalancer import loadbalancer_db
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common import rpc
|
||||
from quantum.openstack.common.rpc import proxy
|
||||
from quantum.plugins.common import constants
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
ACTIVE_PENDING = (
|
||||
constants.ACTIVE,
|
||||
constants.PENDING_CREATE,
|
||||
constants.PENDING_UPDATE
|
||||
)
|
||||
|
||||
|
||||
class LoadBalancerCallbacks(object):
|
||||
RPC_API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, plugin):
|
||||
self.plugin = plugin
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
return q_rpc.PluginRpcDispatcher([self])
|
||||
|
||||
def get_ready_devices(self, context, host=None):
|
||||
with context.session.begin(subtransactions=True):
|
||||
qry = context.session.query(
|
||||
loadbalancer_db.Vip, loadbalancer_db.Pool
|
||||
)
|
||||
qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING))
|
||||
qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING))
|
||||
up = True # makes pep8 and sqlalchemy happy
|
||||
qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up)
|
||||
qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
|
||||
return [p.id for v, p in qry.all()]
|
||||
|
||||
def get_logical_device(self, context, pool_id=None, activate=True,
|
||||
**kwargs):
|
||||
with context.session.begin(subtransactions=True):
|
||||
qry = context.session.query(loadbalancer_db.Pool)
|
||||
qry = qry.filter_by(id=pool_id)
|
||||
pool = qry.one()
|
||||
|
||||
if activate:
|
||||
# set all resources to active
|
||||
if pool.status in ACTIVE_PENDING:
|
||||
pool.status = constants.ACTIVE
|
||||
|
||||
if pool.vip.status in ACTIVE_PENDING:
|
||||
pool.vip.status = constants.ACTIVE
|
||||
|
||||
for m in pool.members:
|
||||
if m.status in ACTIVE_PENDING:
|
||||
m.status = constants.ACTIVE
|
||||
|
||||
for hm in pool.monitors:
|
||||
if hm.monitor.status in ACTIVE_PENDING:
|
||||
hm.monitor.status = constants.ACTIVE
|
||||
|
||||
if (pool.status != constants.ACTIVE
|
||||
or pool.vip.status != constants.ACTIVE):
|
||||
raise Exception(_('Expected active pool and vip'))
|
||||
|
||||
retval = {}
|
||||
retval['pool'] = self.plugin._make_pool_dict(pool)
|
||||
retval['vip'] = self.plugin._make_vip_dict(pool.vip)
|
||||
retval['vip']['port'] = (
|
||||
self.plugin._core_plugin._make_port_dict(pool.vip.port)
|
||||
)
|
||||
for fixed_ip in retval['vip']['port']['fixed_ips']:
|
||||
fixed_ip['subnet'] = (
|
||||
self.plugin._core_plugin.get_subnet(
|
||||
context,
|
||||
fixed_ip['subnet_id']
|
||||
)
|
||||
)
|
||||
retval['members'] = [
|
||||
self.plugin._make_member_dict(m)
|
||||
for m in pool.members if m.status == constants.ACTIVE
|
||||
]
|
||||
retval['healthmonitors'] = [
|
||||
self.plugin._make_health_monitor_dict(hm.monitor)
|
||||
for hm in pool.monitors
|
||||
if hm.monitor.status == constants.ACTIVE
|
||||
]
|
||||
|
||||
return retval
|
||||
|
||||
def pool_destroyed(self, context, pool_id=None, host=None):
|
||||
"""Agent confirmation hook that a pool has been destroyed.
|
||||
|
||||
This method exists for subclasses to change the deletion
|
||||
behavior.
|
||||
"""
|
||||
pass
|
||||
|
||||
def plug_vip_port(self, context, port_id=None, host=None):
|
||||
if not port_id:
|
||||
return
|
||||
|
||||
port = self.plugin._core_plugin.get_port(
|
||||
context,
|
||||
port_id
|
||||
)
|
||||
|
||||
port['admin_state_up'] = True
|
||||
port['device_owner'] = 'quantum:' + constants.LOADBALANCER
|
||||
port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host)))
|
||||
|
||||
self.plugin._core_plugin.update_port(
|
||||
context,
|
||||
port_id,
|
||||
{'port': port}
|
||||
)
|
||||
|
||||
def unplug_vip_port(self, context, port_id=None, host=None):
|
||||
if not port_id:
|
||||
return
|
||||
|
||||
port = self.plugin._core_plugin.get_port(
|
||||
context,
|
||||
port_id
|
||||
)
|
||||
|
||||
port['admin_state_up'] = False
|
||||
port['device_owner'] = ''
|
||||
port['device_id'] = ''
|
||||
|
||||
try:
|
||||
self.plugin._core_plugin.update_port(
|
||||
context,
|
||||
port_id,
|
||||
{'port': port}
|
||||
)
|
||||
|
||||
except q_exc.PortNotFound:
|
||||
msg = _('Unable to find port %s to unplug. This can occur when '
|
||||
'the Vip has been deleted first.')
|
||||
LOG.debug(msg, port_id)
|
||||
|
||||
def update_pool_stats(self, context, pool_id=None, stats=None, host=None):
|
||||
# TODO (markmcclain): add stats collection
|
||||
pass
|
||||
|
||||
|
||||
class LoadBalancerAgentApi(proxy.RpcProxy):
|
||||
"""Plugin side of plugin to agent RPC API."""
|
||||
|
||||
API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, topic, host):
|
||||
super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION)
|
||||
self.host = host
|
||||
|
||||
def reload_pool(self, context, pool_id):
|
||||
return self.cast(
|
||||
context,
|
||||
self.make_msg('reload_pool', pool_id=pool_id, host=self.host),
|
||||
topic=self.topic
|
||||
)
|
||||
|
||||
def destroy_pool(self, context, pool_id):
|
||||
return self.cast(
|
||||
context,
|
||||
self.make_msg('destroy_pool', pool_id=pool_id, host=self.host),
|
||||
topic=self.topic
|
||||
)
|
||||
|
||||
def modify_pool(self, context, pool_id):
|
||||
return self.cast(
|
||||
context,
|
||||
self.make_msg('modify_pool', pool_id=pool_id, host=self.host),
|
||||
topic=self.topic
|
||||
)
|
||||
|
||||
|
||||
class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
|
||||
|
||||
"""
|
||||
Implementation of the Quantum Loadbalancer Service Plugin.
|
||||
|
||||
This class manages the workflow of LBaaS request/response.
|
||||
Most DB related works are implemented in class
|
||||
loadbalancer_db.LoadBalancerPluginDb.
|
||||
"""
|
||||
supported_extension_aliases = ["lbaas"]
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Do the initialization for the loadbalancer service plugin here.
|
||||
"""
|
||||
qdbapi.register_models()
|
||||
|
||||
self.callbacks = LoadBalancerCallbacks(self)
|
||||
|
||||
self.conn = rpc.create_connection(new=True)
|
||||
self.conn.create_consumer(
|
||||
topics.LOADBALANCER_PLUGIN,
|
||||
self.callbacks.create_rpc_dispatcher(),
|
||||
fanout=False)
|
||||
self.conn.consume_in_thread()
|
||||
|
||||
self.agent_rpc = LoadBalancerAgentApi(
|
||||
topics.LOADBALANCER_AGENT,
|
||||
cfg.CONF.host
|
||||
)
|
||||
|
||||
def get_plugin_type(self):
|
||||
return constants.LOADBALANCER
|
||||
|
||||
def get_plugin_description(self):
|
||||
return "Quantum LoadBalancer Service Plugin"
|
||||
|
||||
def create_vip(self, context, vip):
|
||||
vip['vip']['status'] = constants.PENDING_CREATE
|
||||
v = super(LoadBalancerPlugin, self).create_vip(context, vip)
|
||||
self.agent_rpc.reload_pool(context, v['pool_id'])
|
||||
return v
|
||||
|
||||
def update_vip(self, context, id, vip):
|
||||
if 'status' not in vip['vip']:
|
||||
vip['vip']['status'] = constants.PENDING_UPDATE
|
||||
v = super(LoadBalancerPlugin, self).update_vip(context, id, vip)
|
||||
if v['status'] in ACTIVE_PENDING:
|
||||
self.agent_rpc.reload_pool(context, v['pool_id'])
|
||||
else:
|
||||
self.agent_rpc.destroy_pool(context, v['pool_id'])
|
||||
return v
|
||||
|
||||
def delete_vip(self, context, id):
|
||||
vip = self.get_vip(context, id)
|
||||
super(LoadBalancerPlugin, self).delete_vip(context, id)
|
||||
self.agent_rpc.destroy_pool(context, vip['pool_id'])
|
||||
pass
|
||||
|
||||
def create_pool(self, context, pool):
|
||||
p = super(LoadBalancerPlugin, self).create_pool(context, pool)
|
||||
# don't notify here because a pool needs a vip to be useful
|
||||
return p
|
||||
|
||||
def update_pool(self, context, id, pool):
|
||||
if 'status' not in pool['pool']:
|
||||
pool['pool']['status'] = constants.PENDING_UPDATE
|
||||
p = super(LoadBalancerPlugin, self).update_pool(context, id, pool)
|
||||
if p['status'] in ACTIVE_PENDING:
|
||||
self.agent_rpc.reload_pool(context, p['id'])
|
||||
else:
|
||||
self.agent_rpc.destroy_pool(context, p['id'])
|
||||
return p
|
||||
|
||||
def delete_pool(self, context, id):
|
||||
super(LoadBalancerPlugin, self).delete_pool(context, id)
|
||||
self.agent_rpc.destroy_pool(context, id)
|
||||
|
||||
def create_member(self, context, member):
|
||||
m = super(LoadBalancerPlugin, self).create_member(context, member)
|
||||
self.agent_rpc.modify_pool(context, m['pool_id'])
|
||||
return m
|
||||
|
||||
def update_member(self, context, id, member):
|
||||
if 'status' not in member['member']:
|
||||
member['member']['status'] = constants.PENDING_UPDATE
|
||||
m = super(LoadBalancerPlugin, self).update_member(context, id, member)
|
||||
self.agent_rpc.modify_pool(context, m['pool_id'])
|
||||
return m
|
||||
|
||||
def delete_member(self, context, id):
|
||||
m = self.get_member(context, id)
|
||||
super(LoadBalancerPlugin, self).delete_member(context, id)
|
||||
self.agent_rpc.modify_pool(context, m['pool_id'])
|
||||
|
||||
def update_health_monitor(self, context, id, health_monitor):
|
||||
if 'status' not in health_monitor['health_monitor']:
|
||||
health_monitor['health_monitor']['status'] = (
|
||||
constants.PENDING_UPDATE
|
||||
)
|
||||
hm = super(LoadBalancerPlugin, self).update_health_monitor(
|
||||
context,
|
||||
id,
|
||||
health_monitor
|
||||
)
|
||||
|
||||
with context.session.begin(subtransactions=True):
|
||||
qry = context.session.query(
|
||||
loadbalancer_db.PoolMonitorAssociation
|
||||
)
|
||||
qry = qry.filter_by(monitor_id=hm['id'])
|
||||
|
||||
for assoc in qry.all():
|
||||
self.agent_rpc.modify_pool(context, assoc['pool_id'])
|
||||
return hm
|
||||
|
||||
def delete_health_monitor(self, context, id):
|
||||
with context.session.begin(subtransactions=True):
|
||||
qry = context.session.query(
|
||||
loadbalancer_db.PoolMonitorAssociation
|
||||
)
|
||||
qry = qry.filter_by(monitor_id=id)
|
||||
|
||||
pool_ids = [a['pool_id'] for a in qry.all()]
|
||||
super(LoadBalancerPlugin, self).delete_health_monitor(context, id)
|
||||
for pid in pool_ids:
|
||||
self.agent_rpc.modify_pool(context, pid)
|
||||
|
||||
def create_pool_health_monitor(self, context, health_monitor, pool_id):
|
||||
retval = super(LoadBalancerPlugin, self).create_pool_health_monitor(
|
||||
context,
|
||||
health_monitor,
|
||||
pool_id
|
||||
)
|
||||
self.agent_rpc.modify_pool(context, pool_id)
|
||||
|
||||
return retval
|
@ -1,252 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from quantum.db import api as qdbapi
|
||||
from quantum.db import model_base
|
||||
from quantum.db.loadbalancer import loadbalancer_db
|
||||
from quantum.extensions import loadbalancer
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.plugins.common import constants
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
|
||||
|
||||
"""
|
||||
Implementation of the Quantum Loadbalancer Service Plugin.
|
||||
|
||||
This class manages the workflow of LBaaS request/response.
|
||||
Most DB related works are implemented in class
|
||||
loadbalancer_db.LoadBalancerPluginDb.
|
||||
"""
|
||||
supported_extension_aliases = ["lbaas"]
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Do the initialization for the loadbalancer service plugin here.
|
||||
"""
|
||||
qdbapi.register_models(base=model_base.BASEV2)
|
||||
|
||||
# TODO: we probably need to setup RPC channel (to talk to LbAgent) here
|
||||
|
||||
def get_plugin_type(self):
|
||||
return constants.LOADBALANCER
|
||||
|
||||
def get_plugin_description(self):
|
||||
return "Quantum LoadBalancer Service Plugin"
|
||||
|
||||
def create_vip(self, context, vip):
|
||||
v = super(LoadBalancerPlugin, self).create_vip(context, vip)
|
||||
self.update_status(context, loadbalancer_db.Vip, v['id'],
|
||||
constants.PENDING_CREATE)
|
||||
LOG.debug(_("Create vip: %s"), v['id'])
|
||||
|
||||
# If we adopt asynchronous mode, this method should return immediately
|
||||
# and let client to query the object status. The plugin will listen on
|
||||
# the event from device and update the object status by calling
|
||||
# self.update_state(context, Vip, id, ACTIVE/ERROR)
|
||||
#
|
||||
# In synchronous mode, send the request to device here and wait for
|
||||
# response. Eventually update the object status prior to the return.
|
||||
v_query = self.get_vip(context, v['id'])
|
||||
return v_query
|
||||
|
||||
def update_vip(self, context, id, vip):
|
||||
v_query = self.get_vip(
|
||||
context, id, fields=["status"])
|
||||
if v_query['status'] in [
|
||||
constants.PENDING_DELETE, constants.ERROR]:
|
||||
raise loadbalancer.StateInvalid(id=id,
|
||||
state=v_query['status'])
|
||||
|
||||
v = super(LoadBalancerPlugin, self).update_vip(context, id, vip)
|
||||
self.update_status(context, loadbalancer_db.Vip, id,
|
||||
constants.PENDING_UPDATE)
|
||||
LOG.debug(_("Update vip: %s"), id)
|
||||
|
||||
# TODO notify lbagent
|
||||
v_rt = self.get_vip(context, id)
|
||||
return v_rt
|
||||
|
||||
def delete_vip(self, context, id):
|
||||
self.update_status(context, loadbalancer_db.Vip, id,
|
||||
constants.PENDING_DELETE)
|
||||
LOG.debug(_("Delete vip: %s"), id)
|
||||
|
||||
# TODO notify lbagent
|
||||
super(LoadBalancerPlugin, self).delete_vip(context, id)
|
||||
|
||||
def get_vip(self, context, id, fields=None):
|
||||
res = super(LoadBalancerPlugin, self).get_vip(context, id, fields)
|
||||
LOG.debug(_("Get vip: %s"), id)
|
||||
return res
|
||||
|
||||
def get_vips(self, context, filters=None, fields=None):
|
||||
res = super(LoadBalancerPlugin, self).get_vips(
|
||||
context, filters, fields)
|
||||
LOG.debug(_("Get vips"))
|
||||
return res
|
||||
|
||||
def create_pool(self, context, pool):
|
||||
p = super(LoadBalancerPlugin, self).create_pool(context, pool)
|
||||
self.update_status(context, loadbalancer_db.Pool, p['id'],
|
||||
constants.PENDING_CREATE)
|
||||
LOG.debug(_("Create pool: %s"), p['id'])
|
||||
|
||||
# TODO notify lbagent
|
||||
p_rt = self.get_pool(context, p['id'])
|
||||
return p_rt
|
||||
|
||||
def update_pool(self, context, id, pool):
|
||||
p_query = self.get_pool(context, id, fields=["status"])
|
||||
if p_query['status'] in [
|
||||
constants.PENDING_DELETE, constants.ERROR]:
|
||||
raise loadbalancer.StateInvalid(id=id,
|
||||
state=p_query['status'])
|
||||
p = super(LoadBalancerPlugin, self).update_pool(context, id, pool)
|
||||
LOG.debug(_("Update pool: %s"), p['id'])
|
||||
# TODO notify lbagent
|
||||
p_rt = self.get_pool(context, id)
|
||||
return p_rt
|
||||
|
||||
def delete_pool(self, context, id):
|
||||
self.update_status(context, loadbalancer_db.Pool, id,
|
||||
constants.PENDING_DELETE)
|
||||
# TODO notify lbagent
|
||||
super(LoadBalancerPlugin, self).delete_pool(context, id)
|
||||
LOG.debug(_("Delete pool: %s"), id)
|
||||
|
||||
def get_pool(self, context, id, fields=None):
|
||||
res = super(LoadBalancerPlugin, self).get_pool(context, id, fields)
|
||||
LOG.debug(_("Get pool: %s"), id)
|
||||
return res
|
||||
|
||||
def get_pools(self, context, filters=None, fields=None):
|
||||
res = super(LoadBalancerPlugin, self).get_pools(
|
||||
context, filters, fields)
|
||||
LOG.debug(_("Get Pools"))
|
||||
return res
|
||||
|
||||
def stats(self, context, pool_id):
|
||||
res = super(LoadBalancerPlugin, self).get_stats(context, pool_id)
|
||||
LOG.debug(_("Get stats of Pool: %s"), pool_id)
|
||||
return res
|
||||
|
||||
def create_pool_health_monitor(self, context, health_monitor, pool_id):
|
||||
m = super(LoadBalancerPlugin, self).create_pool_health_monitor(
|
||||
context, health_monitor, pool_id)
|
||||
LOG.debug(_("Create health_monitor of pool: %s"), pool_id)
|
||||
return m
|
||||
|
||||
def get_pool_health_monitor(self, context, id, pool_id, fields=None):
|
||||
m = super(LoadBalancerPlugin, self).get_pool_health_monitor(
|
||||
context, id, pool_id, fields)
|
||||
LOG.debug(_("Get health_monitor of pool: %s"), pool_id)
|
||||
return m
|
||||
|
||||
def delete_pool_health_monitor(self, context, id, pool_id):
|
||||
super(LoadBalancerPlugin, self).delete_pool_health_monitor(
|
||||
context, id, pool_id)
|
||||
LOG.debug(_("Delete health_monitor %(id)s of pool: %(pool_id)s"),
|
||||
{"id": id, "pool_id": pool_id})
|
||||
|
||||
def get_member(self, context, id, fields=None):
|
||||
res = super(LoadBalancerPlugin, self).get_member(
|
||||
context, id, fields)
|
||||
LOG.debug(_("Get member: %s"), id)
|
||||
return res
|
||||
|
||||
def get_members(self, context, filters=None, fields=None):
|
||||
res = super(LoadBalancerPlugin, self).get_members(
|
||||
context, filters, fields)
|
||||
LOG.debug(_("Get members"))
|
||||
return res
|
||||
|
||||
def create_member(self, context, member):
|
||||
m = super(LoadBalancerPlugin, self).create_member(context, member)
|
||||
self.update_status(context, loadbalancer_db.Member, m['id'],
|
||||
constants.PENDING_CREATE)
|
||||
LOG.debug(_("Create member: %s"), m['id'])
|
||||
# TODO notify lbagent
|
||||
m_rt = self.get_member(context, m['id'])
|
||||
return m_rt
|
||||
|
||||
def update_member(self, context, id, member):
|
||||
m_query = self.get_member(context, id, fields=["status"])
|
||||
if m_query['status'] in [
|
||||
constants.PENDING_DELETE, constants.ERROR]:
|
||||
raise loadbalancer.StateInvalid(id=id,
|
||||
state=m_query['status'])
|
||||
m = super(LoadBalancerPlugin, self).update_member(context, id, member)
|
||||
self.update_status(context, loadbalancer_db.Member, id,
|
||||
constants.PENDING_UPDATE)
|
||||
LOG.debug(_("Update member: %s"), m['id'])
|
||||
# TODO notify lbagent
|
||||
m_rt = self.get_member(context, id)
|
||||
return m_rt
|
||||
|
||||
def delete_member(self, context, id):
|
||||
self.update_status(context, loadbalancer_db.Member, id,
|
||||
constants.PENDING_DELETE)
|
||||
LOG.debug(_("Delete member: %s"), id)
|
||||
# TODO notify lbagent
|
||||
super(LoadBalancerPlugin, self).delete_member(context, id)
|
||||
|
||||
def get_health_monitor(self, context, id, fields=None):
|
||||
res = super(LoadBalancerPlugin, self).get_health_monitor(
|
||||
context, id, fields)
|
||||
LOG.debug(_("Get health_monitor: %s"), id)
|
||||
return res
|
||||
|
||||
def get_health_monitors(self, context, filters=None, fields=None):
|
||||
res = super(LoadBalancerPlugin, self).get_health_monitors(
|
||||
context, filters, fields)
|
||||
LOG.debug(_("Get health_monitors"))
|
||||
return res
|
||||
|
||||
def create_health_monitor(self, context, health_monitor):
|
||||
h = super(LoadBalancerPlugin, self).create_health_monitor(
|
||||
context, health_monitor)
|
||||
self.update_status(context, loadbalancer_db.HealthMonitor, h['id'],
|
||||
constants.PENDING_CREATE)
|
||||
LOG.debug(_("Create health_monitor: %s"), h['id'])
|
||||
# TODO notify lbagent
|
||||
h_rt = self.get_health_monitor(context, h['id'])
|
||||
return h_rt
|
||||
|
||||
def update_health_monitor(self, context, id, health_monitor):
|
||||
h_query = self.get_health_monitor(context, id, fields=["status"])
|
||||
if h_query['status'] in [
|
||||
constants.PENDING_DELETE, constants.ERROR]:
|
||||
raise loadbalancer.StateInvalid(id=id,
|
||||
state=h_query['status'])
|
||||
h = super(LoadBalancerPlugin, self).update_health_monitor(
|
||||
context, id, health_monitor)
|
||||
self.update_status(context, loadbalancer_db.HealthMonitor, id,
|
||||
constants.PENDING_UPDATE)
|
||||
LOG.debug(_("Update health_monitor: %s"), h['id'])
|
||||
# TODO notify lbagent
|
||||
h_rt = self.get_health_monitor(context, id)
|
||||
return h_rt
|
||||
|
||||
def delete_health_monitor(self, context, id):
|
||||
self.update_status(context, loadbalancer_db.HealthMonitor, id,
|
||||
constants.PENDING_DELETE)
|
||||
LOG.debug(_("Delete health_monitor: %s"), id)
|
||||
super(LoadBalancerPlugin, self).delete_health_monitor(context, id)
|
@ -34,7 +34,9 @@ import quantum.extensions
|
||||
from quantum.extensions import loadbalancer
|
||||
from quantum.manager import QuantumManager
|
||||
from quantum.plugins.common import constants
|
||||
from quantum.plugins.services.loadbalancer import loadbalancerPlugin
|
||||
from quantum.plugins.services.agent_loadbalancer import (
|
||||
plugin as loadbalancer_plugin
|
||||
)
|
||||
from quantum.tests.unit import test_db_plugin
|
||||
from quantum.tests.unit import test_extensions
|
||||
from quantum.tests.unit import testlib_api
|
||||
@ -46,8 +48,7 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
DB_CORE_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2'
|
||||
DB_LB_PLUGIN_KLASS = (
|
||||
"quantum.plugins.services.loadbalancer."
|
||||
"loadbalancerPlugin.LoadBalancerPlugin"
|
||||
"quantum.plugins.services.agent_loadbalancer.plugin.LoadBalancerPlugin"
|
||||
)
|
||||
ROOTDIR = os.path.dirname(__file__) + '../../../..'
|
||||
ETCDIR = os.path.join(ROOTDIR, 'etc')
|
||||
@ -74,7 +75,7 @@ class LoadBalancerPluginDbTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
|
||||
|
||||
self._subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14"
|
||||
|
||||
plugin = loadbalancerPlugin.LoadBalancerPlugin()
|
||||
plugin = loadbalancer_plugin.LoadBalancerPlugin()
|
||||
ext_mgr = PluginAwareExtensionManager(
|
||||
extensions_path,
|
||||
{constants.LOADBALANCER: plugin}
|
||||
|
17
quantum/tests/unit/services/__init__.py
Normal file
17
quantum/tests/unit/services/__init__.py
Normal file
@ -0,0 +1,17 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
17
quantum/tests/unit/services/agent_loadbalancer/__init__.py
Normal file
17
quantum/tests/unit/services/agent_loadbalancer/__init__.py
Normal file
@ -0,0 +1,17 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
@ -0,0 +1,17 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
135
quantum/tests/unit/services/agent_loadbalancer/agent/test_api.py
Normal file
135
quantum/tests/unit/services/agent_loadbalancer/agent/test_api.py
Normal file
@ -0,0 +1,135 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
||||
|
||||
import mock
|
||||
import testtools
|
||||
|
||||
from quantum.plugins.services.agent_loadbalancer.agent import api
|
||||
|
||||
|
||||
class TestApiCache(testtools.TestCase):
|
||||
def setUp(self):
|
||||
super(TestApiCache, self).setUp()
|
||||
self.addCleanup(mock.patch.stopall)
|
||||
|
||||
self.api = api.LbaasAgentApi('topic', mock.sentinel.context, 'host')
|
||||
self.make_msg = mock.patch.object(self.api, 'make_msg').start()
|
||||
self.mock_call = mock.patch.object(self.api, 'call').start()
|
||||
|
||||
def test_init(self):
|
||||
self.assertEqual(self.api.host, 'host')
|
||||
self.assertEqual(self.api.context, mock.sentinel.context)
|
||||
|
||||
def test_get_ready_devices(self):
|
||||
self.assertEqual(
|
||||
self.api.get_ready_devices(),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with('get_ready_devices', host='host')
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value,
|
||||
topic='topic'
|
||||
)
|
||||
|
||||
def test_get_logical_device(self):
|
||||
self.assertEqual(
|
||||
self.api.get_logical_device('pool_id'),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'get_logical_device',
|
||||
pool_id='pool_id',
|
||||
host='host')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value,
|
||||
topic='topic'
|
||||
)
|
||||
|
||||
def test_pool_destroyed(self):
|
||||
self.assertEqual(
|
||||
self.api.pool_destroyed('pool_id'),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'pool_destroyed',
|
||||
pool_id='pool_id',
|
||||
host='host')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value,
|
||||
topic='topic'
|
||||
)
|
||||
|
||||
def test_plug_vip_port(self):
|
||||
self.assertEqual(
|
||||
self.api.plug_vip_port('port_id'),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'plug_vip_port',
|
||||
port_id='port_id',
|
||||
host='host')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value,
|
||||
topic='topic'
|
||||
)
|
||||
|
||||
def test_unplug_vip_port(self):
|
||||
self.assertEqual(
|
||||
self.api.unplug_vip_port('port_id'),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'unplug_vip_port',
|
||||
port_id='port_id',
|
||||
host='host')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value,
|
||||
topic='topic'
|
||||
)
|
||||
|
||||
def test_update_pool_stats(self):
|
||||
self.assertEqual(
|
||||
self.api.update_pool_stats('pool_id', {'stat': 'stat'}),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'update_pool_stats',
|
||||
pool_id='pool_id',
|
||||
stats={'stat': 'stat'},
|
||||
host='host')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value,
|
||||
topic='topic'
|
||||
)
|
@ -0,0 +1,55 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
||||
|
||||
import contextlib
|
||||
import mock
|
||||
from oslo.config import cfg
|
||||
import testtools
|
||||
|
||||
from quantum.plugins.services.agent_loadbalancer import agent
|
||||
|
||||
|
||||
class TestLbaasService(testtools.TestCase):
|
||||
def setUp(self):
|
||||
super(TestLbaasService, self).setUp()
|
||||
self.addCleanup(cfg.CONF.reset)
|
||||
|
||||
cfg.CONF.register_opts(agent.OPTS)
|
||||
|
||||
def test_start(self):
|
||||
with mock.patch.object(
|
||||
agent.rpc_service.Service, 'start'
|
||||
) as mock_start:
|
||||
|
||||
mgr = mock.Mock()
|
||||
agent_service = agent.LbaasAgentService('host', 'topic', mgr)
|
||||
agent_service.start()
|
||||
|
||||
self.assertTrue(mock_start.called)
|
||||
|
||||
def test_main(self):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(agent.service, 'launch'),
|
||||
mock.patch.object(agent, 'eventlet'),
|
||||
mock.patch('sys.argv'),
|
||||
mock.patch.object(agent.manager, 'LbaasAgentManager')
|
||||
) as (mock_launch, mock_eventlet, sys_argv, mgr_cls):
|
||||
agent.main()
|
||||
|
||||
self.assertTrue(mock_eventlet.monkey_patch.called)
|
||||
mock_launch.assert_called_once_with(mock.ANY)
|
@ -0,0 +1,365 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
||||
|
||||
import contextlib
|
||||
|
||||
import mock
|
||||
import testtools
|
||||
|
||||
from quantum.plugins.services.agent_loadbalancer.agent import manager
|
||||
|
||||
|
||||
class TestLogicalDeviceCache(testtools.TestCase):
|
||||
def setUp(self):
|
||||
super(TestLogicalDeviceCache, self).setUp()
|
||||
self.cache = manager.LogicalDeviceCache()
|
||||
|
||||
def test_put(self):
|
||||
fake_device = {
|
||||
'vip': {'port_id': 'port_id'},
|
||||
'pool': {'id': 'pool_id'}
|
||||
}
|
||||
self.cache.put(fake_device)
|
||||
|
||||
self.assertEqual(len(self.cache.devices), 1)
|
||||
self.assertEqual(len(self.cache.port_lookup), 1)
|
||||
self.assertEqual(len(self.cache.pool_lookup), 1)
|
||||
|
||||
def test_double_put(self):
|
||||
fake_device = {
|
||||
'vip': {'port_id': 'port_id'},
|
||||
'pool': {'id': 'pool_id'}
|
||||
}
|
||||
self.cache.put(fake_device)
|
||||
self.cache.put(fake_device)
|
||||
|
||||
self.assertEqual(len(self.cache.devices), 1)
|
||||
self.assertEqual(len(self.cache.port_lookup), 1)
|
||||
self.assertEqual(len(self.cache.pool_lookup), 1)
|
||||
|
||||
def test_remove_in_cache(self):
|
||||
fake_device = {
|
||||
'vip': {'port_id': 'port_id'},
|
||||
'pool': {'id': 'pool_id'}
|
||||
}
|
||||
self.cache.put(fake_device)
|
||||
|
||||
self.assertEqual(len(self.cache.devices), 1)
|
||||
|
||||
self.cache.remove(fake_device)
|
||||
|
||||
self.assertFalse(len(self.cache.devices))
|
||||
self.assertFalse(self.cache.port_lookup)
|
||||
self.assertFalse(self.cache.pool_lookup)
|
||||
|
||||
def test_remove_in_cache_same_object(self):
|
||||
fake_device = {
|
||||
'vip': {'port_id': 'port_id'},
|
||||
'pool': {'id': 'pool_id'}
|
||||
}
|
||||
self.cache.put(fake_device)
|
||||
|
||||
self.assertEqual(len(self.cache.devices), 1)
|
||||
|
||||
self.cache.remove(set(self.cache.devices).pop())
|
||||
|
||||
self.assertFalse(len(self.cache.devices))
|
||||
self.assertFalse(self.cache.port_lookup)
|
||||
self.assertFalse(self.cache.pool_lookup)
|
||||
|
||||
def test_remove_by_pool_id(self):
|
||||
fake_device = {
|
||||
'vip': {'port_id': 'port_id'},
|
||||
'pool': {'id': 'pool_id'}
|
||||
}
|
||||
self.cache.put(fake_device)
|
||||
|
||||
self.assertEqual(len(self.cache.devices), 1)
|
||||
|
||||
self.cache.remove_by_pool_id('pool_id')
|
||||
|
||||
self.assertFalse(len(self.cache.devices))
|
||||
self.assertFalse(self.cache.port_lookup)
|
||||
self.assertFalse(self.cache.pool_lookup)
|
||||
|
||||
def test_get_by_pool_id(self):
|
||||
fake_device = {
|
||||
'vip': {'port_id': 'port_id'},
|
||||
'pool': {'id': 'pool_id'}
|
||||
}
|
||||
self.cache.put(fake_device)
|
||||
|
||||
dev = self.cache.get_by_pool_id('pool_id')
|
||||
|
||||
self.assertEqual(dev.pool_id, 'pool_id')
|
||||
self.assertEqual(dev.port_id, 'port_id')
|
||||
|
||||
def test_get_by_port_id(self):
|
||||
fake_device = {
|
||||
'vip': {'port_id': 'port_id'},
|
||||
'pool': {'id': 'pool_id'}
|
||||
}
|
||||
self.cache.put(fake_device)
|
||||
|
||||
dev = self.cache.get_by_port_id('port_id')
|
||||
|
||||
self.assertEqual(dev.pool_id, 'pool_id')
|
||||
self.assertEqual(dev.port_id, 'port_id')
|
||||
|
||||
def test_get_pool_ids(self):
|
||||
fake_device = {
|
||||
'vip': {'port_id': 'port_id'},
|
||||
'pool': {'id': 'pool_id'}
|
||||
}
|
||||
self.cache.put(fake_device)
|
||||
|
||||
self.assertEqual(self.cache.get_pool_ids(), ['pool_id'])
|
||||
|
||||
|
||||
class TestManager(testtools.TestCase):
|
||||
def setUp(self):
|
||||
super(TestManager, self).setUp()
|
||||
self.addCleanup(mock.patch.stopall)
|
||||
|
||||
mock_conf = mock.Mock()
|
||||
mock_conf.interface_driver = 'intdriver'
|
||||
mock_conf.device_driver = 'devdriver'
|
||||
mock_conf.AGENT.root_helper = 'sudo'
|
||||
mock_conf.loadbalancer_state_path = '/the/path'
|
||||
|
||||
self.mock_importer = mock.patch.object(manager, 'importutils').start()
|
||||
|
||||
rpc_mock_cls = mock.patch(
|
||||
'quantum.plugins.services.agent_loadbalancer.agent.api'
|
||||
'.LbaasAgentApi'
|
||||
).start()
|
||||
|
||||
self.mgr = manager.LbaasAgentManager(mock_conf)
|
||||
self.rpc_mock = rpc_mock_cls.return_value
|
||||
self.log = mock.patch.object(manager, 'LOG').start()
|
||||
self.mgr.needs_resync = False
|
||||
|
||||
def test_initialize_service_hook(self):
|
||||
with mock.patch.object(self.mgr, 'sync_state') as sync:
|
||||
self.mgr.initialize_service_hook(mock.Mock())
|
||||
sync.assert_called_once_with()
|
||||
|
||||
def test_periodic_resync_needs_sync(self):
|
||||
with mock.patch.object(self.mgr, 'sync_state') as sync:
|
||||
self.mgr.needs_resync = True
|
||||
self.mgr.periodic_resync(mock.Mock())
|
||||
sync.assert_called_once_with()
|
||||
|
||||
def test_periodic_resync_no_sync(self):
|
||||
with mock.patch.object(self.mgr, 'sync_state') as sync:
|
||||
self.mgr.needs_resync = False
|
||||
self.mgr.periodic_resync(mock.Mock())
|
||||
self.assertFalse(sync.called)
|
||||
|
||||
def test_collect_stats(self):
|
||||
with mock.patch.object(self.mgr, 'cache') as cache:
|
||||
cache.get_pool_ids.return_value = ['1', '2']
|
||||
self.mgr.collect_stats(mock.Mock())
|
||||
self.rpc_mock.update_pool_stats.assert_has_calls([
|
||||
mock.call('1', mock.ANY),
|
||||
mock.call('2', mock.ANY)
|
||||
])
|
||||
|
||||
def test_collect_stats_exception(self):
|
||||
with mock.patch.object(self.mgr, 'cache') as cache:
|
||||
cache.get_pool_ids.return_value = ['1', '2']
|
||||
with mock.patch.object(self.mgr, 'driver') as driver:
|
||||
driver.get_stats.side_effect = Exception
|
||||
|
||||
self.mgr.collect_stats(mock.Mock())
|
||||
|
||||
self.assertFalse(self.rpc_mock.called)
|
||||
self.assertTrue(self.mgr.needs_resync)
|
||||
self.assertTrue(self.log.exception.called)
|
||||
|
||||
def test_vip_plug_callback(self):
|
||||
self.mgr._vip_plug_callback('plug', {'id': 'id'})
|
||||
self.rpc_mock.plug_vip_port.assert_called_once_with('id')
|
||||
|
||||
def test_vip_unplug_callback(self):
|
||||
self.mgr._vip_plug_callback('unplug', {'id': 'id'})
|
||||
self.rpc_mock.unplug_vip_port.assert_called_once_with('id')
|
||||
|
||||
def _sync_state_helper(self, cache, ready, refreshed, destroyed):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.mgr, 'cache'),
|
||||
mock.patch.object(self.mgr, 'refresh_device'),
|
||||
mock.patch.object(self.mgr, 'destroy_device')
|
||||
) as (mock_cache, refresh, destroy):
|
||||
|
||||
mock_cache.get_pool_ids.return_value = cache
|
||||
self.rpc_mock.get_ready_devices.return_value = ready
|
||||
|
||||
self.mgr.sync_state()
|
||||
|
||||
self.assertEqual(len(refreshed), len(refresh.mock_calls))
|
||||
self.assertEqual(len(destroyed), len(destroy.mock_calls))
|
||||
|
||||
refresh.assert_has_calls([mock.call(i) for i in refreshed])
|
||||
destroy.assert_has_calls([mock.call(i) for i in destroyed])
|
||||
self.assertFalse(self.mgr.needs_resync)
|
||||
|
||||
def test_sync_state_all_known(self):
|
||||
self._sync_state_helper(['1', '2'], ['1', '2'], ['1', '2'], [])
|
||||
|
||||
def test_sync_state_all_unknown(self):
|
||||
self._sync_state_helper([], ['1', '2'], ['1', '2'], [])
|
||||
|
||||
def test_sync_state_destroy_all(self):
|
||||
self._sync_state_helper(['1', '2'], [], [], ['1', '2'])
|
||||
|
||||
def test_sync_state_both(self):
|
||||
self._sync_state_helper(['1'], ['2'], ['2'], ['1'])
|
||||
|
||||
def test_sync_state_exception(self):
|
||||
self.rpc_mock.get_ready_devices.side_effect = Exception
|
||||
|
||||
self.mgr.sync_state()
|
||||
|
||||
self.assertTrue(self.log.exception.called)
|
||||
self.assertTrue(self.mgr.needs_resync)
|
||||
|
||||
def test_refresh_device_exists(self):
|
||||
config = self.rpc_mock.get_logical_device.return_value
|
||||
|
||||
with mock.patch.object(self.mgr, 'driver') as driver:
|
||||
with mock.patch.object(self.mgr, 'cache') as cache:
|
||||
driver.exists.return_value = True
|
||||
|
||||
self.mgr.refresh_device(config)
|
||||
|
||||
driver.exists.assert_called_once_with(config)
|
||||
driver.update.assert_called_once_with(config)
|
||||
cache.put.assert_called_once_with(config)
|
||||
self.assertFalse(self.mgr.needs_resync)
|
||||
|
||||
def test_refresh_device_new(self):
|
||||
config = self.rpc_mock.get_logical_device.return_value
|
||||
|
||||
with mock.patch.object(self.mgr, 'driver') as driver:
|
||||
with mock.patch.object(self.mgr, 'cache') as cache:
|
||||
driver.exists.return_value = False
|
||||
|
||||
self.mgr.refresh_device(config)
|
||||
|
||||
driver.exists.assert_called_once_with(config)
|
||||
driver.create.assert_called_once_with(config)
|
||||
cache.put.assert_called_once_with(config)
|
||||
self.assertFalse(self.mgr.needs_resync)
|
||||
|
||||
def test_refresh_device_exception(self):
|
||||
config = self.rpc_mock.get_logical_device.return_value
|
||||
|
||||
with mock.patch.object(self.mgr, 'driver') as driver:
|
||||
with mock.patch.object(self.mgr, 'cache') as cache:
|
||||
driver.exists.side_effect = Exception
|
||||
self.mgr.refresh_device(config)
|
||||
|
||||
driver.exists.assert_called_once_with(config)
|
||||
self.assertTrue(self.mgr.needs_resync)
|
||||
self.assertTrue(self.log.exception.called)
|
||||
self.assertFalse(cache.put.called)
|
||||
|
||||
def test_destroy_device_known(self):
|
||||
with mock.patch.object(self.mgr, 'driver') as driver:
|
||||
with mock.patch.object(self.mgr, 'cache') as cache:
|
||||
cache.get_by_pool_id.return_value = True
|
||||
|
||||
self.mgr.destroy_device('pool_id')
|
||||
cache.get_by_pool_id.assert_called_once_with('pool_id')
|
||||
driver.destroy.assert_called_once_with('pool_id')
|
||||
self.rpc_mock.pool_destroyed.assert_called_once_with(
|
||||
'pool_id'
|
||||
)
|
||||
cache.remove.assert_called_once_with(True)
|
||||
self.assertFalse(self.mgr.needs_resync)
|
||||
|
||||
def test_destroy_device_unknown(self):
|
||||
with mock.patch.object(self.mgr, 'driver') as driver:
|
||||
with mock.patch.object(self.mgr, 'cache') as cache:
|
||||
cache.get_by_pool_id.return_value = None
|
||||
|
||||
self.mgr.destroy_device('pool_id')
|
||||
cache.get_by_pool_id.assert_called_once_with('pool_id')
|
||||
self.assertFalse(driver.destroy.called)
|
||||
|
||||
def test_destroy_device_exception(self):
|
||||
with mock.patch.object(self.mgr, 'driver') as driver:
|
||||
with mock.patch.object(self.mgr, 'cache') as cache:
|
||||
cache.get_by_pool_id.return_value = True
|
||||
driver.destroy.side_effect = Exception
|
||||
|
||||
self.mgr.destroy_device('pool_id')
|
||||
cache.get_by_pool_id.assert_called_once_with('pool_id')
|
||||
|
||||
self.assertTrue(self.log.exception.called)
|
||||
self.assertTrue(self.mgr.needs_resync)
|
||||
|
||||
def test_remove_orphans(self):
|
||||
with mock.patch.object(self.mgr, 'driver') as driver:
|
||||
with mock.patch.object(self.mgr, 'cache') as cache:
|
||||
cache.get_pool_ids.return_value = ['1', '2']
|
||||
self.mgr.remove_orphans()
|
||||
|
||||
driver.remove_orphans.assert_called_once_with(['1', '2'])
|
||||
|
||||
def test_reload_pool(self):
|
||||
with mock.patch.object(self.mgr, 'refresh_device') as refresh:
|
||||
self.mgr.reload_pool(mock.Mock(), pool_id='pool_id')
|
||||
refresh.assert_called_once_with('pool_id')
|
||||
|
||||
def test_modify_pool_known(self):
|
||||
with mock.patch.object(self.mgr, 'refresh_device') as refresh:
|
||||
with mock.patch.object(self.mgr, 'cache') as cache:
|
||||
cache.get_by_pool_id.return_value = True
|
||||
|
||||
self.mgr.reload_pool(mock.Mock(), pool_id='pool_id')
|
||||
|
||||
refresh.assert_called_once_with('pool_id')
|
||||
|
||||
def test_modify_pool_unknown(self):
|
||||
with mock.patch.object(self.mgr, 'refresh_device') as refresh:
|
||||
with mock.patch.object(self.mgr, 'cache') as cache:
|
||||
cache.get_by_pool_id.return_value = False
|
||||
|
||||
self.mgr.modify_pool(mock.Mock(), pool_id='pool_id')
|
||||
|
||||
self.assertFalse(refresh.called)
|
||||
|
||||
def test_destroy_pool_known(self):
|
||||
with mock.patch.object(self.mgr, 'destroy_device') as destroy:
|
||||
with mock.patch.object(self.mgr, 'cache') as cache:
|
||||
cache.get_by_pool_id.return_value = True
|
||||
|
||||
self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id')
|
||||
|
||||
destroy.assert_called_once_with('pool_id')
|
||||
|
||||
def test_destroy_pool_unknown(self):
|
||||
with mock.patch.object(self.mgr, 'destroy_device') as destroy:
|
||||
with mock.patch.object(self.mgr, 'cache') as cache:
|
||||
cache.get_by_pool_id.return_value = False
|
||||
|
||||
self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id')
|
||||
|
||||
self.assertFalse(destroy.called)
|
@ -0,0 +1,17 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
@ -0,0 +1,17 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
@ -0,0 +1,131 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 New Dream Network, LLC (DreamHost)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
||||
|
||||
import contextlib
|
||||
import mock
|
||||
import testtools
|
||||
|
||||
from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
|
||||
namespace_driver
|
||||
)
|
||||
|
||||
|
||||
class TestHaproxyNSDriver(testtools.TestCase):
|
||||
def setUp(self):
|
||||
super(TestHaproxyNSDriver, self).setUp()
|
||||
|
||||
self.vif_driver = mock.Mock()
|
||||
self.vip_plug_callback = mock.Mock()
|
||||
|
||||
self.driver = namespace_driver.HaproxyNSDriver(
|
||||
'sudo',
|
||||
'/the/path',
|
||||
self.vif_driver,
|
||||
self.vip_plug_callback
|
||||
)
|
||||
|
||||
self.fake_config = {
|
||||
'pool': {'id': 'pool_id'},
|
||||
'vip': {'id': 'vip_id', 'port': {'id': 'port_id'}}
|
||||
}
|
||||
|
||||
def test_create(self):
|
||||
with mock.patch.object(self.driver, '_plug') as plug:
|
||||
with mock.patch.object(self.driver, '_spawn') as spawn:
|
||||
self.driver.create(self.fake_config)
|
||||
|
||||
plug.assert_called_once_with(
|
||||
'qlbaas-pool_id', {'id': 'port_id'}
|
||||
)
|
||||
spawn.assert_called_once_with(self.fake_config)
|
||||
|
||||
def test_update(self):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.driver, '_get_state_file_path'),
|
||||
mock.patch.object(self.driver, '_spawn'),
|
||||
mock.patch('__builtin__.open')
|
||||
) as (gsp, spawn, mock_open):
|
||||
mock_open.return_value = ['5']
|
||||
|
||||
self.driver.update(self.fake_config)
|
||||
|
||||
mock_open.assert_called_once_with(gsp.return_value, 'r')
|
||||
spawn.assert_called_once_with(self.fake_config, ['-sf', '5'])
|
||||
|
||||
def test_spawn(self):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(namespace_driver.hacfg, 'save_config'),
|
||||
mock.patch.object(self.driver, '_get_state_file_path'),
|
||||
mock.patch('quantum.agent.linux.ip_lib.IPWrapper')
|
||||
) as (mock_save, gsp, ip_wrap):
|
||||
gsp.side_effect = lambda x, y: y
|
||||
|
||||
self.driver._spawn(self.fake_config)
|
||||
|
||||
mock_save.assert_called_once_with('conf', self.fake_config, 'sock')
|
||||
cmd = ['haproxy', '-f', 'conf', '-p', 'pid']
|
||||
ip_wrap.assert_has_calls([
|
||||
mock.call('sudo', 'qlbaas-pool_id'),
|
||||
mock.call().netns.execute(cmd)
|
||||
])
|
||||
|
||||
def test_destroy(self):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.driver, '_get_state_file_path'),
|
||||
mock.patch.object(namespace_driver, 'kill_pids_in_file'),
|
||||
mock.patch.object(self.driver, '_unplug'),
|
||||
mock.patch('quantum.agent.linux.ip_lib.IPWrapper'),
|
||||
mock.patch('os.path.isdir'),
|
||||
mock.patch('shutil.rmtree')
|
||||
) as (gsp, kill, unplug, ip_wrap, isdir, rmtree):
|
||||
gsp.side_effect = lambda x, y: '/pool/' + y
|
||||
|
||||
self.driver.pool_to_port_id['pool_id'] = 'port_id'
|
||||
isdir.return_value = True
|
||||
|
||||
self.driver.destroy('pool_id')
|
||||
|
||||
kill.assert_called_once_with(ip_wrap(), '/pool/pid')
|
||||
unplug.assert_called_once_with('qlbaas-pool_id', 'port_id')
|
||||
isdir.called_once_with('/pool')
|
||||
rmtree.assert_called_once_with('/pool')
|
||||
ip_wrap.assert_has_calls([
|
||||
mock.call('sudo', 'qlbaas-pool_id'),
|
||||
mock.call().garbage_collect_namespace()
|
||||
])
|
||||
|
||||
def test_exists(self):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.driver, '_get_state_file_path'),
|
||||
mock.patch('quantum.agent.linux.ip_lib.IPWrapper'),
|
||||
mock.patch('socket.socket'),
|
||||
mock.patch('os.path.exists'),
|
||||
) as (gsp, ip_wrap, socket, path_exists):
|
||||
gsp.side_effect = lambda x, y: '/pool/' + y
|
||||
|
||||
ip_wrap.return_value.netns.exists.return_value = True
|
||||
path_exists.return_value = True
|
||||
|
||||
self.driver.exists('pool_id')
|
||||
|
||||
ip_wrap.assert_has_calls([
|
||||
mock.call('sudo'),
|
||||
mock.call().netns.exists('qlbaas-pool_id')
|
||||
])
|
||||
|
||||
self.assertTrue(self.driver.exists('pool_id'))
|
263
quantum/tests/unit/services/agent_loadbalancer/test_plugin.py
Normal file
263
quantum/tests/unit/services/agent_loadbalancer/test_plugin.py
Normal file
@ -0,0 +1,263 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
||||
|
||||
import mock
|
||||
import testtools
|
||||
|
||||
from quantum import context
|
||||
from quantum import manager
|
||||
from quantum.plugins.common import constants
|
||||
from quantum.plugins.services.agent_loadbalancer import plugin
|
||||
from quantum.tests.unit.db.loadbalancer import test_db_loadbalancer
|
||||
|
||||
|
||||
class TestLoadBalancerPluginBase(
|
||||
test_db_loadbalancer.LoadBalancerPluginDbTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestLoadBalancerPluginBase, self).setUp()
|
||||
|
||||
# create another API instance to make testing easier
|
||||
# pass a mock to our API instance
|
||||
|
||||
# we need access to loaded plugins to modify models
|
||||
loaded_plugins = manager.QuantumManager().get_service_plugins()
|
||||
self.plugin_instance = loaded_plugins[constants.LOADBALANCER]
|
||||
self.callbacks = self.plugin_instance.callbacks
|
||||
|
||||
|
||||
class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
|
||||
def test_get_ready_devices(self):
|
||||
with self.vip() as vip:
|
||||
ready = self.callbacks.get_ready_devices(
|
||||
context.get_admin_context(),
|
||||
)
|
||||
self.assertEqual(ready, [vip['vip']['pool_id']])
|
||||
|
||||
def test_get_ready_devices_inactive_vip(self):
|
||||
with self.vip() as vip:
|
||||
|
||||
# set the vip inactive need to use plugin directly since
|
||||
# status is not tenant mutable
|
||||
self.plugin_instance.update_vip(
|
||||
context.get_admin_context(),
|
||||
vip['vip']['id'],
|
||||
{'vip': {'status': constants.INACTIVE}}
|
||||
)
|
||||
|
||||
ready = self.callbacks.get_ready_devices(
|
||||
context.get_admin_context(),
|
||||
)
|
||||
self.assertFalse(ready)
|
||||
|
||||
def test_get_ready_devices_inactive_pool(self):
|
||||
with self.vip() as vip:
|
||||
|
||||
# set the pool inactive need to use plugin directly since
|
||||
# status is not tenant mutable
|
||||
self.plugin_instance.update_pool(
|
||||
context.get_admin_context(),
|
||||
vip['vip']['pool_id'],
|
||||
{'pool': {'status': constants.INACTIVE}}
|
||||
)
|
||||
|
||||
ready = self.callbacks.get_ready_devices(
|
||||
context.get_admin_context(),
|
||||
)
|
||||
self.assertFalse(ready)
|
||||
|
||||
def test_get_logical_device_inactive(self):
|
||||
with self.pool() as pool:
|
||||
with self.vip(pool=pool) as vip:
|
||||
with self.member(pool_id=vip['vip']['pool_id']) as member:
|
||||
self.assertRaises(
|
||||
Exception,
|
||||
self.callbacks.get_logical_device,
|
||||
context.get_admin_context(),
|
||||
pool['pool']['id'],
|
||||
activate=False
|
||||
)
|
||||
|
||||
def test_get_logical_device_activate(self):
|
||||
with self.pool() as pool:
|
||||
with self.vip(pool=pool) as vip:
|
||||
with self.member(pool_id=vip['vip']['pool_id']) as member:
|
||||
ctx = context.get_admin_context()
|
||||
|
||||
# build the expected
|
||||
port = self.plugin_instance._core_plugin.get_port(
|
||||
ctx, vip['vip']['port_id']
|
||||
)
|
||||
subnet = self.plugin_instance._core_plugin.get_subnet(
|
||||
ctx, vip['vip']['subnet_id']
|
||||
)
|
||||
port['fixed_ips'][0]['subnet'] = subnet
|
||||
|
||||
# reload pool to add members and vip
|
||||
pool = self.plugin_instance.get_pool(
|
||||
ctx, pool['pool']['id']
|
||||
)
|
||||
|
||||
pool['status'] = constants.ACTIVE
|
||||
vip['vip']['status'] = constants.ACTIVE
|
||||
vip['vip']['port'] = port
|
||||
member['member']['status'] = constants.ACTIVE
|
||||
|
||||
expected = {
|
||||
'pool': pool,
|
||||
'vip': vip['vip'],
|
||||
'members': [member['member']],
|
||||
'healthmonitors': []
|
||||
}
|
||||
|
||||
logical_config = self.callbacks.get_logical_device(
|
||||
ctx, pool['id'], activate=True
|
||||
)
|
||||
|
||||
self.assertEqual(logical_config, expected)
|
||||
|
||||
def _update_port_test_helper(self, expected, func, **kwargs):
|
||||
core = self.plugin_instance._core_plugin
|
||||
|
||||
with self.pool() as pool:
|
||||
with self.vip(pool=pool) as vip:
|
||||
with self.member(pool_id=vip['vip']['pool_id']) as member:
|
||||
ctx = context.get_admin_context()
|
||||
func(ctx, port_id=vip['vip']['port_id'], **kwargs)
|
||||
|
||||
db_port = core.get_port(ctx, vip['vip']['port_id'])
|
||||
|
||||
for k, v in expected.iteritems():
|
||||
self.assertEqual(db_port[k], v)
|
||||
|
||||
def test_plug_vip_port(self):
|
||||
exp = {
|
||||
'device_owner': 'quantum:' + constants.LOADBALANCER,
|
||||
'device_id': 'c596ce11-db30-5c72-8243-15acaae8690f',
|
||||
'admin_state_up': True
|
||||
}
|
||||
self._update_port_test_helper(
|
||||
exp,
|
||||
self.callbacks.plug_vip_port,
|
||||
host='host'
|
||||
)
|
||||
|
||||
def test_unplug_vip_port(self):
|
||||
exp = {
|
||||
'device_owner': '',
|
||||
'device_id': '',
|
||||
'admin_state_up': False
|
||||
}
|
||||
self._update_port_test_helper(
|
||||
exp,
|
||||
self.callbacks.unplug_vip_port,
|
||||
host='host'
|
||||
)
|
||||
|
||||
|
||||
class TestLoadBalancerAgentApi(testtools.TestCase):
|
||||
def setUp(self):
|
||||
super(TestLoadBalancerAgentApi, self).setUp()
|
||||
self.addCleanup(mock.patch.stopall)
|
||||
|
||||
self.api = plugin.LoadBalancerAgentApi('topic', 'host')
|
||||
self.mock_cast = mock.patch.object(self.api, 'cast').start()
|
||||
self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
|
||||
|
||||
def test_init(self):
|
||||
self.assertEqual(self.api.topic, 'topic')
|
||||
self.assertEqual(self.api.host, 'host')
|
||||
|
||||
def _call_test_helper(self, method_name):
|
||||
rv = getattr(self.api, method_name)(mock.sentinel.context, 'the_id')
|
||||
self.assertEqual(rv, self.mock_cast.return_value)
|
||||
self.mock_cast.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.mock_msg.return_value,
|
||||
topic='topic'
|
||||
)
|
||||
|
||||
self.mock_msg.assert_called_once_with(
|
||||
method_name,
|
||||
pool_id='the_id',
|
||||
host='host'
|
||||
)
|
||||
|
||||
def test_reload_pool(self):
|
||||
self._call_test_helper('reload_pool')
|
||||
|
||||
def test_destroy_pool(self):
|
||||
self._call_test_helper('destroy_pool')
|
||||
|
||||
def test_modify_pool(self):
|
||||
self._call_test_helper('modify_pool')
|
||||
|
||||
|
||||
class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
||||
def setUp(self):
|
||||
self.log = mock.patch.object(plugin, 'LOG')
|
||||
api_cls = mock.patch.object(plugin, 'LoadBalancerAgentApi').start()
|
||||
super(TestLoadBalancerPluginNotificationWrapper, self).setUp()
|
||||
self.mock_api = api_cls.return_value
|
||||
|
||||
self.addCleanup(mock.patch.stopall)
|
||||
|
||||
def test_create_vip(self):
|
||||
with self.subnet() as subnet:
|
||||
with self.pool(subnet=subnet) as pool:
|
||||
with self.vip(pool=pool, subnet=subnet) as vip:
|
||||
self.mock_api.reload_pool.assert_called_once_with(
|
||||
mock.ANY,
|
||||
vip['vip']['pool_id']
|
||||
)
|
||||
|
||||
def test_update_vip(self):
|
||||
with self.subnet() as subnet:
|
||||
with self.pool(subnet=subnet) as pool:
|
||||
with self.vip(pool=pool, subnet=subnet) as vip:
|
||||
self.mock_api.reset_mock()
|
||||
ctx = context.get_admin_context()
|
||||
vip['vip'].pop('status')
|
||||
new_vip = self.plugin_instance.update_vip(
|
||||
ctx,
|
||||
vip['vip']['id'],
|
||||
vip
|
||||
)
|
||||
|
||||
self.mock_api.reload_pool.assert_called_once_with(
|
||||
mock.ANY,
|
||||
vip['vip']['pool_id']
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
new_vip['status'],
|
||||
constants.PENDING_UPDATE
|
||||
)
|
||||
|
||||
def t2est_delete_vip(self):
|
||||
with self.subnet() as subnet:
|
||||
with self.pool(subnet=subnet) as pool:
|
||||
with self.vip(pool=pool, subnet=subnet, no_delete=True) as vip:
|
||||
self.mock_api.reset_mock()
|
||||
ctx = context.get_admin_context()
|
||||
self.plugin_instance.delete_vip(context, vip['vip']['id'])
|
||||
self.mock_api.destroy_pool.assert_called_once_with(
|
||||
mock.ANY,
|
||||
vip['vip']['pool_id']
|
||||
)
|
@ -70,3 +70,21 @@ class AgentUtilsGetInterfaceMAC(testtools.TestCase):
|
||||
'\x00' * 232])
|
||||
actual_val = utils.get_interface_mac('eth0')
|
||||
self.assertEqual(actual_val, expect_val)
|
||||
|
||||
|
||||
class AgentUtilsReplaceFile(testtools.TestCase):
|
||||
def test_replace_file(self):
|
||||
# make file to replace
|
||||
with mock.patch('tempfile.NamedTemporaryFile') as ntf:
|
||||
ntf.return_value.name = '/baz'
|
||||
with mock.patch('os.chmod') as chmod:
|
||||
with mock.patch('os.rename') as rename:
|
||||
utils.replace_file('/foo', 'bar')
|
||||
|
||||
expected = [mock.call('w+', dir='/', delete=False),
|
||||
mock.call().write('bar'),
|
||||
mock.call().close()]
|
||||
|
||||
ntf.assert_has_calls(expected)
|
||||
chmod.assert_called_once_with('/baz', 0644)
|
||||
rename.assert_called_once_with('/baz', '/foo')
|
||||
|
@ -138,22 +138,6 @@ class TestDhcpBase(testtools.TestCase):
|
||||
def test_base_abc_error(self):
|
||||
self.assertRaises(TypeError, dhcp.DhcpBase, None)
|
||||
|
||||
def test_replace_file(self):
|
||||
# make file to replace
|
||||
with mock.patch('tempfile.NamedTemporaryFile') as ntf:
|
||||
ntf.return_value.name = '/baz'
|
||||
with mock.patch('os.chmod') as chmod:
|
||||
with mock.patch('os.rename') as rename:
|
||||
dhcp.replace_file('/foo', 'bar')
|
||||
|
||||
expected = [mock.call('w+', dir='/', delete=False),
|
||||
mock.call().write('bar'),
|
||||
mock.call().close()]
|
||||
|
||||
ntf.assert_has_calls(expected)
|
||||
chmod.assert_called_once_with('/baz', 0644)
|
||||
rename.assert_called_once_with('/baz', '/foo')
|
||||
|
||||
def test_restart(self):
|
||||
class SubClass(dhcp.DhcpBase):
|
||||
def __init__(self):
|
||||
@ -212,7 +196,7 @@ class TestBase(testtools.TestCase):
|
||||
self.conf.set_override('state_path', '')
|
||||
self.conf.use_namespaces = True
|
||||
|
||||
self.replace_p = mock.patch('quantum.agent.linux.dhcp.replace_file')
|
||||
self.replace_p = mock.patch('quantum.agent.linux.utils.replace_file')
|
||||
self.execute_p = mock.patch('quantum.agent.linux.utils.execute')
|
||||
self.addCleanup(self.execute_p.stop)
|
||||
self.safe = self.replace_p.start()
|
||||
@ -392,7 +376,7 @@ class TestDhcpLocalProcess(TestBase):
|
||||
self.assertEqual(lp.interface_name, 'tap0')
|
||||
|
||||
def test_set_interface_name(self):
|
||||
with mock.patch('quantum.agent.linux.dhcp.replace_file') as replace:
|
||||
with mock.patch('quantum.agent.linux.utils.replace_file') as replace:
|
||||
lp = LocalChild(self.conf, FakeDualNetwork())
|
||||
with mock.patch.object(lp, 'get_conf_file_name') as conf_file:
|
||||
conf_file.return_value = '/interface'
|
||||
|
2
setup.py
2
setup.py
@ -137,6 +137,8 @@ else:
|
||||
'quantum-debug = quantum.debug.shell:main',
|
||||
'quantum-ovs-cleanup = quantum.agent.ovs_cleanup_util:main',
|
||||
'quantum-db-manage = quantum.db.migration.cli:main',
|
||||
('quantum-lbaas-agent = '
|
||||
'quantum.plugins.services.agent_loadbalancer.agent:main'),
|
||||
('quantum-check-nvp-config = '
|
||||
'quantum.plugins.nicira.nicira_nvp_plugin.check_nvp_config:main'),
|
||||
]
|
||||
|
Loading…
Reference in New Issue
Block a user