From 83d9b3abddf8b255ceff01dc93676ee451348d6c Mon Sep 17 00:00:00 2001 From: Kobi Samoray Date: Thu, 24 May 2018 10:15:26 +0300 Subject: [PATCH] NSX|V+V3: Octavia driver Implementing the Octavia support for NSX-V & NSX-T. Follow up patches will handle the TVD plugin, Status updates, and migration. Since Octavia is not (yet?) in the requirements, using a hack to allow unittests to be skipped. Co-Authored-by: Adit Sarfaty Change-Id: Iadb24e7eadcab658faf3e646cc528c2a8a6976e5 --- .zuul.yaml | 1 + devstack/lib/nsx_common | 2 +- doc/source/devstack.rst | 24 + lower-constraints.txt | 1 + .../octavia-support-2fa83d464dbc4e52.yaml | 6 + requirements.txt | 1 + setup.cfg | 3 +- tox.ini | 1 + vmware_nsx/common/config.py | 4 + vmware_nsx/db/db.py | 16 +- .../alembic_migrations/versions/EXPAND_HEAD | 2 +- .../fc6308289aca_lbaas_no_foreign_key.py | 51 ++ vmware_nsx/db/nsx_models.py | 37 +- vmware_nsx/db/nsxv_db.py | 18 + vmware_nsx/db/nsxv_models.py | 30 +- .../nsx_v/drivers/exclusive_router_driver.py | 9 +- vmware_nsx/plugins/nsx_v/plugin.py | 40 +- vmware_nsx/plugins/nsx_v3/plugin.py | 54 +- vmware_nsx/services/lbaas/lb_const.py | 4 + vmware_nsx/services/lbaas/lb_translators.py | 2 + .../nsx_v/implementation/listener_mgr.py | 44 ++ .../nsx_v/implementation/loadbalancer_mgr.py | 54 +- .../lbaas/nsx_v3/implementation/l7rule_mgr.py | 2 + .../lbaas/nsx_v3/implementation/lb_utils.py | 5 + .../nsx_v3/implementation/listener_mgr.py | 41 ++ .../nsx_v3/implementation/loadbalancer_mgr.py | 6 +- .../lbaas/nsx_v3/implementation/member_mgr.py | 13 +- .../services/lbaas/nsx_v3/v2/lb_driver_v2.py | 11 +- vmware_nsx/services/lbaas/octavia/__init__.py | 0 .../services/lbaas/octavia/constants.py | 45 ++ .../services/lbaas/octavia/octavia_driver.py | 507 ++++++++++++++++++ .../lbaas/octavia/octavia_listener.py | 368 +++++++++++++ .../unit/services/lbaas/test_nsxv3_driver.py | 3 +- .../services/lbaas/test_octavia_driver.py | 477 ++++++++++++++++ .../services/lbaas/test_octavia_listener.py | 301 +++++++++++ .../services/qos/test_nsxv_notification.py | 11 +- 36 files changed, 2090 insertions(+), 104 deletions(-) create mode 100644 releasenotes/notes/octavia-support-2fa83d464dbc4e52.yaml create mode 100644 vmware_nsx/db/migration/alembic_migrations/versions/stein/expand/fc6308289aca_lbaas_no_foreign_key.py create mode 100644 vmware_nsx/services/lbaas/octavia/__init__.py create mode 100644 vmware_nsx/services/lbaas/octavia/constants.py create mode 100644 vmware_nsx/services/lbaas/octavia/octavia_driver.py create mode 100644 vmware_nsx/services/lbaas/octavia/octavia_listener.py create mode 100644 vmware_nsx/tests/unit/services/lbaas/test_octavia_driver.py create mode 100644 vmware_nsx/tests/unit/services/lbaas/test_octavia_listener.py diff --git a/.zuul.yaml b/.zuul.yaml index 8fa1848b17..45f17da695 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -131,3 +131,4 @@ - openstack/neutron-dynamic-routing - openstack/neutron-vpnaas - openstack/tap-as-a-service + - openstack/octavia diff --git a/devstack/lib/nsx_common b/devstack/lib/nsx_common index afdcfdf2fa..7e28a2461e 100644 --- a/devstack/lib/nsx_common +++ b/devstack/lib/nsx_common @@ -30,7 +30,7 @@ function _nsxv_ini_set { } function install_neutron_projects { - pkg_list="networking-l2gw networking-sfc neutron-lbaas neutron-fwaas neutron-dynamic-routing neutron-vpnaas vmware-nsxlib" + pkg_list="networking-l2gw networking-sfc neutron-lbaas neutron-fwaas neutron-dynamic-routing neutron-vpnaas octavia vmware-nsxlib" for pkg in `echo $pkg_list` do if is_plugin_enabled $pkg; then diff --git a/doc/source/devstack.rst b/doc/source/devstack.rst index f8f8ee6d9c..3fc2edf445 100644 --- a/doc/source/devstack.rst +++ b/doc/source/devstack.rst @@ -238,6 +238,30 @@ Add neutron-vpnaas repo as an external repository and configure following flags [DEFAULT] api_extensions_path = $DEST/neutron-vpnaas/neutron_vpnaas/extensions +Octavia +~~~~~~~ + +Add octavia repo as an external repository and configure following flags in ``local.conf``:: + + [[local|localrc]] + OCTAVIA_NODE=api + DISABLE_AMP_IMAGE_BUILD=True + enable_plugin octavia $GIT_BASE/openstack/octavia.git + enable_plugin octavia-dashboard $GIT_BASE/openstack/octavia-dashboard + enable_service octavia + enable_service o-api + + [[post-config|$OCTAVIA_CONF]] + [DEFAULT] + verbose = True + debug = True + + [api_settings] + default_provider_driver=vmwareedge + enabled_provider_drivers=vmwareedge:NSX + + [oslo_messaging] + topic=vmwarensxv_edge_lb NSX-TVD ------- diff --git a/lower-constraints.txt b/lower-constraints.txt index f50cf5181a..ba4e9224d9 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -55,6 +55,7 @@ munch==2.1.0 netaddr==0.7.18 netifaces==0.10.4 neutron-lib==1.18.0 +octavia==3.0.0 openstackdocstheme==1.18.1 openstacksdk==0.11.2 os-client-config==1.28.0 diff --git a/releasenotes/notes/octavia-support-2fa83d464dbc4e52.yaml b/releasenotes/notes/octavia-support-2fa83d464dbc4e52.yaml new file mode 100644 index 0000000000..e6fd5f06c6 --- /dev/null +++ b/releasenotes/notes/octavia-support-2fa83d464dbc4e52.yaml @@ -0,0 +1,6 @@ +--- +prelude: > + Support Octavia loadbalancer support in NSXv and NSXv3 plugins. +features: + - | + NSXv and NSXv3 plugins now support Octavia loadbalancer. diff --git a/requirements.txt b/requirements.txt index 45818cde18..e54c413217 100644 --- a/requirements.txt +++ b/requirements.txt @@ -38,6 +38,7 @@ neutron-fwaas>=12.0.0 # Apache-2.0 neutron-vpnaas>=12.0.0 # Apache-2.0 neutron-dynamic-routing>=12.0.0 # Apache-2.0 vmware-nsxlib>=12.0.0 # Apache-2.0 +#octavia>=3.0.0 # Apache-2.0 # The comment below indicates this project repo is current with neutron-lib # and should receive neutron-lib consumption patches as they are released diff --git a/setup.cfg b/setup.cfg index 9fa2cdcb95..962bbbed92 100644 --- a/setup.cfg +++ b/setup.cfg @@ -94,7 +94,8 @@ vmware_nsx.neutron.nsxv3.housekeeper.jobs = orphaned_logical_router = vmware_nsx.plugins.nsx_v3.housekeeper.orphaned_logical_router:OrphanedLogicalRouterJob orphaned_firewall_section = vmware_nsx.plugins.nsx_v3.housekeeper.orphaned_firewall_section:OrphanedFirewallSectionJob mismatch_logical_port = vmware_nsx.plugins.nsx_v3.housekeeper.mismatch_logical_port:MismatchLogicalportJob - +octavia.api.drivers = + vmwareedge = vmware_nsx.services.lbaas.octavia.octavia_driver:NSXOctaviaDriver [build_sphinx] source-dir = doc/source build-dir = doc/build diff --git a/tox.ini b/tox.ini index 4bfcdcdc73..20ee42e4e2 100644 --- a/tox.ini +++ b/tox.ini @@ -34,6 +34,7 @@ commands = pip install -q -e "git+https://git.openstack.org/openstack/neutron-fwaas#egg=neutron_fwaas" pip install -q -e "git+https://git.openstack.org/openstack/neutron-dynamic-routing#egg=neutron_dynamic_routing" pip install -q -e "git+https://git.openstack.org/openstack/neutron-vpnaas#egg=neutron_vpnaas" + pip install -q -e "git+https://git.openstack.org/openstack/octavia#egg=octavia" pip install -q -e "git+https://git.openstack.org/openstack/vmware-nsxlib#egg=vmware_nsxlib" pip install -q -e "git+https://git.openstack.org/openstack/neutron#egg=neutron" diff --git a/vmware_nsx/common/config.py b/vmware_nsx/common/config.py index 70418fb895..1daa80f2bd 100644 --- a/vmware_nsx/common/config.py +++ b/vmware_nsx/common/config.py @@ -262,6 +262,10 @@ nsx_common_opts = [ default=[], help=_("(Optional) List of email addresses for " "notifications.")), + cfg.IntOpt('octavia_stats_interval', + default=10, + help=_("Interval in seconds for Octavia statistics reporting. " + "0 means no reporting")), ] nsx_v3_and_p = [ diff --git a/vmware_nsx/db/db.py b/vmware_nsx/db/db.py index 83781ebe37..d840f44ca8 100644 --- a/vmware_nsx/db/db.py +++ b/vmware_nsx/db/db.py @@ -559,6 +559,10 @@ def get_nsx_lbaas_loadbalancer_binding(session, loadbalancer_id): return +def get_nsx_lbaas_loadbalancer_bindings(session): + return session.query(nsx_models.NsxLbaasLoadbalancer).all() + + def get_nsx_lbaas_loadbalancer_binding_by_service(session, lb_service_id): return session.query( nsx_models.NsxLbaasLoadbalancer).filter_by( @@ -591,7 +595,8 @@ def get_nsx_lbaas_listener_binding(session, loadbalancer_id, listener_id): return -def get_nsx_lbaas_listener_binding_by_vs(session, loadbalancer_id, lb_vs_id): +def get_nsx_lbaas_listener_binding_by_lb_and_vs(session, loadbalancer_id, + lb_vs_id): try: return session.query( nsx_models.NsxLbaasListener).filter_by( @@ -601,6 +606,15 @@ def get_nsx_lbaas_listener_binding_by_vs(session, loadbalancer_id, lb_vs_id): return +def get_nsx_lbaas_listener_binding_by_vs_id(session, lb_vs_id): + try: + return session.query( + nsx_models.NsxLbaasListener).filter_by( + lb_vs_id=lb_vs_id).one() + except exc.NoResultFound: + return + + def delete_nsx_lbaas_listener_binding(session, loadbalancer_id, listener_id): return (session.query(nsx_models.NsxLbaasListener). filter_by(loadbalancer_id=loadbalancer_id, diff --git a/vmware_nsx/db/migration/alembic_migrations/versions/EXPAND_HEAD b/vmware_nsx/db/migration/alembic_migrations/versions/EXPAND_HEAD index 76862a8293..92707c7b04 100644 --- a/vmware_nsx/db/migration/alembic_migrations/versions/EXPAND_HEAD +++ b/vmware_nsx/db/migration/alembic_migrations/versions/EXPAND_HEAD @@ -1 +1 @@ -0dbeda408e41 +fc6308289aca diff --git a/vmware_nsx/db/migration/alembic_migrations/versions/stein/expand/fc6308289aca_lbaas_no_foreign_key.py b/vmware_nsx/db/migration/alembic_migrations/versions/stein/expand/fc6308289aca_lbaas_no_foreign_key.py new file mode 100644 index 0000000000..71f78f62c0 --- /dev/null +++ b/vmware_nsx/db/migration/alembic_migrations/versions/stein/expand/fc6308289aca_lbaas_no_foreign_key.py @@ -0,0 +1,51 @@ +# Copyright 2018 VMware, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""lbaas_no_foreign_key + +Revision ID: fc6308289aca +Revises: 0dbeda408e41 +Create Date: 2018-06-04 13:47:09.450116 +""" + +from alembic import op +from sqlalchemy.engine import reflection + +from neutron.db import migration + + +# revision identifiers, used by Alembic. +revision = 'fc6308289aca' +down_revision = '0dbeda408e41' +depends_on = ('717f7f63a219') + + +def upgrade(): + for table_name in ['nsxv3_lbaas_loadbalancers', + 'nsxv3_lbaas_listeners', + 'nsxv3_lbaas_pools', + 'nsxv3_lbaas_monitors', + 'nsxv3_lbaas_l7rules', + 'nsxv3_lbaas_l7policies', + 'nsxv_lbaas_loadbalancer_bindings', + 'nsxv_lbaas_listener_bindings', + 'nsxv_lbaas_pool_bindings', + 'nsxv_lbaas_monitor_bindings', + 'nsxv_lbaas_l7policy_bindings']: + + if migration.schema_has_table(table_name): + inspector = reflection.Inspector.from_engine(op.get_bind()) + fk_constraint = inspector.get_foreign_keys(table_name)[0] + op.drop_constraint(fk_constraint.get('name'), table_name, + type_='foreignkey') diff --git a/vmware_nsx/db/nsx_models.py b/vmware_nsx/db/nsx_models.py index 82e7aacb66..df9a4a277c 100644 --- a/vmware_nsx/db/nsx_models.py +++ b/vmware_nsx/db/nsx_models.py @@ -399,12 +399,7 @@ class NsxLbaasLoadbalancer(model_base.BASEV2, models.TimestampMixin): and NSX logical router id. """ __tablename__ = 'nsxv3_lbaas_loadbalancers' - fk_name = 'fk_nsxv3_lbaas_loadbalancers_id' - loadbalancer_id = sa.Column(sa.String(36), - sa.ForeignKey('lbaas_loadbalancers.id', - name=fk_name, - ondelete="CASCADE"), - primary_key=True) + loadbalancer_id = sa.Column(sa.String(36), primary_key=True) lb_router_id = sa.Column(sa.String(36), nullable=False) lb_service_id = sa.Column(sa.String(36), nullable=False) vip_address = sa.Column(sa.String(36), nullable=False) @@ -414,11 +409,7 @@ class NsxLbaasListener(model_base.BASEV2, models.TimestampMixin): """Stores the mapping between LBaaS listener and NSX LB virtual server""" __tablename__ = 'nsxv3_lbaas_listeners' loadbalancer_id = sa.Column(sa.String(36), primary_key=True) - listener_id = sa.Column(sa.String(36), - sa.ForeignKey('lbaas_listeners.id', - name='fk_nsxv3_lbaas_listeners_id', - ondelete="CASCADE"), - primary_key=True) + listener_id = sa.Column(sa.String(36), primary_key=True) app_profile_id = sa.Column(sa.String(36), nullable=False) lb_vs_id = sa.Column(sa.String(36), nullable=False) @@ -427,11 +418,7 @@ class NsxLbaasPool(model_base.BASEV2, models.TimestampMixin): """Stores the mapping between LBaaS pool and NSX LB Pool""" __tablename__ = 'nsxv3_lbaas_pools' loadbalancer_id = sa.Column(sa.String(36), primary_key=True) - pool_id = sa.Column(sa.String(36), - sa.ForeignKey('lbaas_pools.id', - name='fk_nsxv3_lbaas_pools_id', - ondelete="CASCADE"), - primary_key=True) + pool_id = sa.Column(sa.String(36), primary_key=True) lb_pool_id = sa.Column(sa.String(36), nullable=False) lb_vs_id = sa.Column(sa.String(36)) @@ -441,11 +428,7 @@ class NsxLbaasMonitor(model_base.BASEV2, models.TimestampMixin): __tablename__ = 'nsxv3_lbaas_monitors' loadbalancer_id = sa.Column(sa.String(36), primary_key=True) pool_id = sa.Column(sa.String(36), primary_key=True) - hm_id = sa.Column(sa.String(36), - sa.ForeignKey('lbaas_healthmonitors.id', - name='fk_nsxv3_lbaas_healthmonitors_id', - ondelete="CASCADE"), - primary_key=True) + hm_id = sa.Column(sa.String(36), primary_key=True) lb_monitor_id = sa.Column(sa.String(36), nullable=False) lb_pool_id = sa.Column(sa.String(36), nullable=False) @@ -462,11 +445,7 @@ class NsxLbaasL7Rule(model_base.BASEV2, models.TimestampMixin): __tablename__ = 'nsxv3_lbaas_l7rules' loadbalancer_id = sa.Column(sa.String(36), primary_key=True) l7policy_id = sa.Column(sa.String(36), primary_key=True) - l7rule_id = sa.Column(sa.String(36), - sa.ForeignKey('lbaas_l7rules.id', - name='fk_nsxv3_lbaas_l7rules_id', - ondelete="CASCADE"), - primary_key=True) + l7rule_id = sa.Column(sa.String(36), primary_key=True) lb_rule_id = sa.Column(sa.String(36), nullable=False) lb_vs_id = sa.Column(sa.String(36), nullable=False) @@ -474,11 +453,7 @@ class NsxLbaasL7Rule(model_base.BASEV2, models.TimestampMixin): class NsxLbaasL7Policy(model_base.BASEV2, models.TimestampMixin): """Stores the mapping between LBaaS l7policy and NSX LB rule""" __tablename__ = 'nsxv3_lbaas_l7policies' - l7policy_id = sa.Column(sa.String(36), - sa.ForeignKey('lbaas_l7policies.id', - name='fk_nsxv3_lbaas_l7policies_id', - ondelete="CASCADE"), - primary_key=True) + l7policy_id = sa.Column(sa.String(36), primary_key=True) lb_rule_id = sa.Column(sa.String(36), nullable=False) lb_vs_id = sa.Column(sa.String(36), nullable=False) diff --git a/vmware_nsx/db/nsxv_db.py b/vmware_nsx/db/nsxv_db.py index ba618e460d..aa65503a01 100644 --- a/vmware_nsx/db/nsxv_db.py +++ b/vmware_nsx/db/nsxv_db.py @@ -684,6 +684,15 @@ def add_nsxv_lbaas_loadbalancer_binding( return binding +def get_nsxv_lbaas_loadbalancer_bindings(session, filters=None, + like_filters=None): + session = db_api.get_reader_session() + query = session.query(nsxv_models.NsxvLbaasLoadbalancerBinding) + return nsx_db._apply_filters_to_query( + query, nsxv_models.NsxvLbaasLoadbalancerBinding, filters, + like_filters).all() + + def get_nsxv_lbaas_loadbalancer_binding(session, loadbalancer_id): try: return session.query( @@ -731,6 +740,15 @@ def del_nsxv_lbaas_listener_binding(session, loadbalancer_id, listener_id): listener_id=listener_id).delete()) +def get_nsxv_lbaas_listener_binding_by_vse(session, loadbalancer_id, vse_id): + try: + return session.query( + nsxv_models.NsxvLbaasListenerBinding).filter_by( + loadbalancer_id=loadbalancer_id, vse_id=vse_id).one() + except exc.NoResultFound: + return + + def add_nsxv_lbaas_pool_binding(session, loadbalancer_id, pool_id, edge_pool_id): with session.begin(subtransactions=True): diff --git a/vmware_nsx/db/nsxv_models.py b/vmware_nsx/db/nsxv_models.py index 9b5ae70a14..cfab7d54d3 100644 --- a/vmware_nsx/db/nsxv_models.py +++ b/vmware_nsx/db/nsxv_models.py @@ -252,11 +252,7 @@ class NsxvLbaasLoadbalancerBinding(model_base.BASEV2, models.TimestampMixin): __tablename__ = 'nsxv_lbaas_loadbalancer_bindings' - loadbalancer_id = sa.Column(sa.String(36), - sa.ForeignKey('lbaas_loadbalancers.id', - name='fk_lbaas_loadbalancers_id', - ondelete="CASCADE"), - primary_key=True) + loadbalancer_id = sa.Column(sa.String(36), primary_key=True) edge_id = sa.Column(sa.String(36), nullable=False) edge_fw_rule_id = sa.Column(sa.String(36), nullable=False) vip_address = sa.Column(sa.String(36), nullable=False) @@ -268,11 +264,7 @@ class NsxvLbaasListenerBinding(model_base.BASEV2, models.TimestampMixin): __tablename__ = 'nsxv_lbaas_listener_bindings' loadbalancer_id = sa.Column(sa.String(36), primary_key=True) - listener_id = sa.Column(sa.String(36), - sa.ForeignKey('lbaas_listeners.id', - name='fk_lbaas_listeners_id', - ondelete="CASCADE"), - primary_key=True) + listener_id = sa.Column(sa.String(36), primary_key=True) app_profile_id = sa.Column(sa.String(36), nullable=False) vse_id = sa.Column(sa.String(36), nullable=False) @@ -283,11 +275,7 @@ class NsxvLbaasPoolBinding(model_base.BASEV2, models.TimestampMixin): __tablename__ = 'nsxv_lbaas_pool_bindings' loadbalancer_id = sa.Column(sa.String(36), primary_key=True) - pool_id = sa.Column(sa.String(36), - sa.ForeignKey('lbaas_pools.id', - name='fk_lbaas_pools_id', - ondelete="CASCADE"), - primary_key=True) + pool_id = sa.Column(sa.String(36), primary_key=True) edge_pool_id = sa.Column(sa.String(36), nullable=False) @@ -298,11 +286,7 @@ class NsxvLbaasMonitorBinding(model_base.BASEV2, models.TimestampMixin): loadbalancer_id = sa.Column(sa.String(36), primary_key=True) pool_id = sa.Column(sa.String(36), primary_key=True) - hm_id = sa.Column(sa.String(36), - sa.ForeignKey('lbaas_healthmonitors.id', - name='fk_lbaas_healthmonitors_id', - ondelete="CASCADE"), - primary_key=True) + hm_id = sa.Column(sa.String(36), primary_key=True) edge_id = sa.Column(sa.String(36), primary_key=True) edge_mon_id = sa.Column(sa.String(36), nullable=False) @@ -322,11 +306,7 @@ class NsxvLbaasL7PolicyBinding(model_base.BASEV2, models.TimestampMixin): __tablename__ = 'nsxv_lbaas_l7policy_bindings' - policy_id = sa.Column(sa.String(36), - sa.ForeignKey('lbaas_l7policies.id', - name='fk_lbaas_l7policies_id', - ondelete="CASCADE"), - primary_key=True) + policy_id = sa.Column(sa.String(36), primary_key=True) edge_id = sa.Column(sa.String(36), nullable=False) edge_app_rule_id = sa.Column(sa.String(36), nullable=False) diff --git a/vmware_nsx/plugins/nsx_v/drivers/exclusive_router_driver.py b/vmware_nsx/plugins/nsx_v/drivers/exclusive_router_driver.py index 3f7ddf9959..5c58ccace4 100644 --- a/vmware_nsx/plugins/nsx_v/drivers/exclusive_router_driver.py +++ b/vmware_nsx/plugins/nsx_v/drivers/exclusive_router_driver.py @@ -16,7 +16,6 @@ from oslo_log import log as logging from neutron_lib import constants as n_consts from neutron_lib.db import api as db_api -from neutron_lib.plugins import constants as plugin_const from vmware_nsx._i18n import _ from vmware_nsx.common import exceptions as nsxv_exc @@ -26,6 +25,7 @@ from vmware_nsx.plugins.nsx_v.drivers import ( abstract_router_driver as router_driver) from vmware_nsx.plugins.nsx_v import plugin as nsx_v from vmware_nsx.plugins.nsx_v.vshield import edge_utils +from vmware_nsx.services.lbaas.octavia import constants as oct_const LOG = logging.getLogger(__name__) @@ -279,9 +279,10 @@ class RouterExclusiveDriver(router_driver.RouterBaseDriver): def _check_lb_on_subnet(self, context, subnet_id, router_id): # Check lbaas - dev_owner_v1 = 'neutron:' + plugin_const.LOADBALANCER - dev_owner_v2 = 'neutron:' + plugin_const.LOADBALANCERV2 - filters = {'device_owner': [dev_owner_v1, dev_owner_v2], + dev_owner_v1 = n_consts.DEVICE_OWNER_LOADBALANCER + dev_owner_v2 = n_consts.DEVICE_OWNER_LOADBALANCERV2 + dev_owner_oct = oct_const.DEVICE_OWNER_OCTAVIA + filters = {'device_owner': [dev_owner_v1, dev_owner_v2, dev_owner_oct], 'fixed_ips': {'subnet_id': [subnet_id]}} ports = super(nsx_v.NsxVPluginV2, self.plugin).get_ports( context, filters=filters) diff --git a/vmware_nsx/plugins/nsx_v/plugin.py b/vmware_nsx/plugins/nsx_v/plugin.py index f60d0567a9..3446ba15ba 100644 --- a/vmware_nsx/plugins/nsx_v/plugin.py +++ b/vmware_nsx/plugins/nsx_v/plugin.py @@ -143,6 +143,15 @@ from vmware_nsx.plugins.nsx_v.vshield import securitygroup_utils from vmware_nsx.plugins.nsx_v.vshield import vcns_driver from vmware_nsx.services.flowclassifier.nsx_v import utils as fc_utils from vmware_nsx.services.fwaas.nsx_v import fwaas_callbacks +from vmware_nsx.services.lbaas.nsx_v.implementation import healthmon_mgr +from vmware_nsx.services.lbaas.nsx_v.implementation import l7policy_mgr +from vmware_nsx.services.lbaas.nsx_v.implementation import l7rule_mgr +from vmware_nsx.services.lbaas.nsx_v.implementation import listener_mgr +from vmware_nsx.services.lbaas.nsx_v.implementation import loadbalancer_mgr +from vmware_nsx.services.lbaas.nsx_v.implementation import member_mgr +from vmware_nsx.services.lbaas.nsx_v.implementation import pool_mgr +from vmware_nsx.services.lbaas.octavia import constants as oct_const +from vmware_nsx.services.lbaas.octavia import octavia_listener LOG = logging.getLogger(__name__) PORTGROUP_PREFIX = 'dvportgroup' @@ -226,6 +235,8 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, def __init__(self): self._is_sub_plugin = tvd_utils.is_tvd_core_plugin() self.init_is_complete = False + self.octavia_listener = None + self.octavia_stats_collector = None self.housekeeper = None super(NsxVPluginV2, self).__init__() if self._is_sub_plugin: @@ -321,6 +332,10 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, # Bind QoS notifications qos_driver.register(self) + registry.subscribe(self.spawn_complete, + resources.PROCESS, + events.AFTER_SPAWN) + # subscribe the init complete method last, so it will be called only # if init was successful registry.subscribe(self.init_complete, @@ -335,6 +350,16 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, def is_tvd_plugin(): return False + def spawn_complete(self, resource, event, trigger, payload=None): + # This method should run only once, but after init_complete + if not self.init_is_complete: + self.init_complete(None, None, None) + + self.octavia_stats_collector = ( + octavia_listener.NSXOctaviaStatisticsCollector( + self, + listener_mgr.stats_getter)) + def init_complete(self, resource, event, trigger, payload=None): with locking.LockManager.get_lock('plugin-init-complete'): if self.init_is_complete: @@ -362,6 +387,18 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, hk_readonly=cfg.CONF.nsxv.housekeeping_readonly, hk_readonly_jobs=cfg.CONF.nsxv.housekeeping_readonly_jobs) + # Init octavia listener and endpoints + self.octavia_listener = octavia_listener.NSXOctaviaListener( + loadbalancer=loadbalancer_mgr.EdgeLoadBalancerManagerFromDict( + self.nsx_v), + listener=listener_mgr.EdgeListenerManagerFromDict(self.nsx_v), + pool=pool_mgr.EdgePoolManagerFromDict(self.nsx_v), + member=member_mgr.EdgeMemberManagerFromDict(self.nsx_v), + healthmonitor=healthmon_mgr.EdgeHealthMonitorManagerFromDict( + self.nsx_v), + l7policy=l7policy_mgr.EdgeL7PolicyManagerFromDict(self.nsx_v), + l7rule=l7rule_mgr.EdgeL7RuleManagerFromDict(self.nsx_v)) + self.init_is_complete = True def _validate_nsx_version(self): @@ -1851,7 +1888,8 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, def _assert_on_lb_port_admin_state(self, port_data, original_port, device_owner): - if device_owner == constants.DEVICE_OWNER_LOADBALANCERV2: + if device_owner in [constants.DEVICE_OWNER_LOADBALANCERV2, + oct_const.DEVICE_OWNER_OCTAVIA]: orig_state = original_port.get("admin_state_up") new_state = port_data.get("admin_state_up") if new_state is not None and (orig_state != new_state) and ( diff --git a/vmware_nsx/plugins/nsx_v3/plugin.py b/vmware_nsx/plugins/nsx_v3/plugin.py index 64fdb2e998..de0fbcf8cf 100644 --- a/vmware_nsx/plugins/nsx_v3/plugin.py +++ b/vmware_nsx/plugins/nsx_v3/plugin.py @@ -116,7 +116,16 @@ from vmware_nsx.plugins.nsx_v3 import utils as v3_utils from vmware_nsx.services.fwaas.common import utils as fwaas_utils from vmware_nsx.services.fwaas.nsx_v3 import fwaas_callbacks_v1 from vmware_nsx.services.fwaas.nsx_v3 import fwaas_callbacks_v2 +from vmware_nsx.services.lbaas.nsx_v3.implementation import healthmonitor_mgr +from vmware_nsx.services.lbaas.nsx_v3.implementation import l7policy_mgr +from vmware_nsx.services.lbaas.nsx_v3.implementation import l7rule_mgr +from vmware_nsx.services.lbaas.nsx_v3.implementation import listener_mgr +from vmware_nsx.services.lbaas.nsx_v3.implementation import loadbalancer_mgr +from vmware_nsx.services.lbaas.nsx_v3.implementation import member_mgr +from vmware_nsx.services.lbaas.nsx_v3.implementation import pool_mgr from vmware_nsx.services.lbaas.nsx_v3.v2 import lb_driver_v2 +from vmware_nsx.services.lbaas.octavia import constants as oct_const +from vmware_nsx.services.lbaas.octavia import octavia_listener from vmware_nsx.services.qos.common import utils as qos_com_utils from vmware_nsx.services.qos.nsx_v3 import driver as qos_driver from vmware_nsx.services.trunk.nsx_v3 import driver as trunk_driver @@ -211,6 +220,8 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, self.fwaas_callbacks = None self._is_sub_plugin = tvd_utils.is_tvd_core_plugin() self.init_is_complete = False + self.octavia_listener = None + self.octavia_stats_collector = None nsxlib_utils.set_is_attr_callback(validators.is_attr_set) self._extend_fault_map() if self._is_sub_plugin: @@ -293,6 +304,10 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, # Register NSXv3 trunk driver to support trunk extensions self.trunk_driver = trunk_driver.NsxV3TrunkDriver.create(self) + registry.subscribe(self.spawn_complete, + resources.PROCESS, + events.AFTER_SPAWN) + # subscribe the init complete method last, so it will be called only # if init was successful registry.subscribe(self.init_complete, @@ -430,6 +445,16 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, def is_tvd_plugin(): return False + def spawn_complete(self, resource, event, trigger, payload=None): + # This method should run only once, but after init_complete + if not self.init_is_complete: + self.init_complete(None, None, None) + + self.octavia_stats_collector = ( + octavia_listener.NSXOctaviaStatisticsCollector( + self, + listener_mgr.stats_getter)) + def init_complete(self, resource, event, trigger, payload=None): with locking.LockManager.get_lock('plugin-init-complete'): if self.init_is_complete: @@ -451,8 +476,25 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, hk_readonly=cfg.CONF.nsx_v3.housekeeping_readonly, hk_readonly_jobs=cfg.CONF.nsx_v3.housekeeping_readonly_jobs) + # Init octavia listener and endpoints + self._init_octavia() + self.init_is_complete = True + def _init_octavia(self): + if not self.nsxlib.feature_supported( + nsxlib_consts.FEATURE_LOAD_BALANCER): + return + + self.octavia_listener = octavia_listener.NSXOctaviaListener( + loadbalancer=loadbalancer_mgr.EdgeLoadBalancerManagerFromDict(), + listener=listener_mgr.EdgeListenerManagerFromDict(), + pool=pool_mgr.EdgePoolManagerFromDict(), + member=member_mgr.EdgeMemberManagerFromDict(), + healthmonitor=healthmonitor_mgr.EdgeHealthMonitorManagerFromDict(), + l7policy=l7policy_mgr.EdgeL7PolicyManagerFromDict(), + l7rule=l7rule_mgr.EdgeL7RuleManagerFromDict()) + def _extend_fault_map(self): """Extends the Neutron Fault Map. @@ -4479,7 +4521,8 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, port_data = self.get_port(context, port_id) device_owner = port_data.get('device_owner') fip_address = new_fip['floating_ip_address'] - if device_owner == const.DEVICE_OWNER_LOADBALANCERV2: + if (device_owner == const.DEVICE_OWNER_LOADBALANCERV2 or + device_owner == oct_const.DEVICE_OWNER_OCTAVIA): try: self._update_lb_vip(port_data, fip_address) except nsx_lib_exc.ManagerError: @@ -4508,7 +4551,8 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, port_data = self.get_port(context, port_id) device_owner = port_data.get('device_owner') fixed_ip_address = fip['fixed_ip_address'] - if device_owner == const.DEVICE_OWNER_LOADBALANCERV2: + if (device_owner == const.DEVICE_OWNER_LOADBALANCERV2 or + device_owner == oct_const.DEVICE_OWNER_OCTAVIA): # If the port is LB VIP port, after deleting the FIP, # update the virtual server VIP back to fixed IP. is_lb_port = True @@ -4551,7 +4595,8 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, old_port_data = self.get_port(context, old_port_id) old_device_owner = old_port_data['device_owner'] old_fixed_ip = old_fip['fixed_ip_address'] - if old_device_owner == const.DEVICE_OWNER_LOADBALANCERV2: + if (old_device_owner == const.DEVICE_OWNER_LOADBALANCERV2 or + old_device_owner == oct_const.DEVICE_OWNER_OCTAVIA): is_lb_port = True self._update_lb_vip(old_port_data, old_fixed_ip) @@ -4578,7 +4623,8 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, new_port_data = self.get_port(context, new_port_id) new_dev_own = new_port_data['device_owner'] new_fip_address = new_fip['floating_ip_address'] - if new_dev_own == const.DEVICE_OWNER_LOADBALANCERV2: + if (new_dev_own == const.DEVICE_OWNER_LOADBALANCERV2 or + new_dev_own == oct_const.DEVICE_OWNER_OCTAVIA): is_lb_port = True self._update_lb_vip(new_port_data, new_fip_address) diff --git a/vmware_nsx/services/lbaas/lb_const.py b/vmware_nsx/services/lbaas/lb_const.py index 8b4d249f62..cce1900cb4 100644 --- a/vmware_nsx/services/lbaas/lb_const.py +++ b/vmware_nsx/services/lbaas/lb_const.py @@ -96,6 +96,10 @@ LB_STATS_MAP = {'active_connections': 'current_sessions', 'bytes_in': 'bytes_in', 'bytes_out': 'bytes_out', 'total_connections': 'total_sessions'} +LB_EMPTY_STATS = {'active_connections': 0, + 'bytes_in': 0, + 'bytes_out': 0, + 'total_connections': 0} LR_ROUTER_TYPE = 'os-neutron-router-id' LR_PORT_TYPE = 'os-neutron-rport-id' LB_CERT_RESOURCE_TYPE = ['certificate_signed', 'certificate_self_signed'] diff --git a/vmware_nsx/services/lbaas/lb_translators.py b/vmware_nsx/services/lbaas/lb_translators.py index 9dfbde6ff0..d7b6570b3c 100644 --- a/vmware_nsx/services/lbaas/lb_translators.py +++ b/vmware_nsx/services/lbaas/lb_translators.py @@ -32,12 +32,14 @@ def lb_listener_obj_to_dict(listener): # Translate the LBaaS listener to a dictionary skipping the some objects # to avoid recursions listener_dict = listener.to_dict(loadbalancer=False, default_pool=False) + # Translate the default pool separately without it's internal objects if listener.default_pool: listener_dict['default_pool'] = lb_pool_obj_to_dict( listener.default_pool, with_listeners=False) else: listener_dict['default_pool'] = None + if listener.loadbalancer: listener_dict['loadbalancer'] = lb_loadbalancer_obj_to_dict( listener.loadbalancer) diff --git a/vmware_nsx/services/lbaas/nsx_v/implementation/listener_mgr.py b/vmware_nsx/services/lbaas/nsx_v/implementation/listener_mgr.py index 78a3a094f9..e3f2d8a5da 100644 --- a/vmware_nsx/services/lbaas/nsx_v/implementation/listener_mgr.py +++ b/vmware_nsx/services/lbaas/nsx_v/implementation/listener_mgr.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +import copy + from oslo_log import helpers as log_helpers from oslo_log import log as logging from oslo_utils import excutils @@ -297,3 +299,45 @@ class EdgeListenerManagerFromDict(base_mgr.EdgeLoadbalancerBaseManager): listener['id']) completor(success=True) + + +def stats_getter(context, core_plugin, ignore_list=None): + """Update Octavia statistics for each listener (virtual server)""" + stat_list = [] + vcns = core_plugin.nsx_v.vcns + # go over all LB edges + bindings = nsxv_db.get_nsxv_lbaas_loadbalancer_bindings(context.session) + for binding in bindings: + lb_id = binding['loadbalancer_id'] + if ignore_list and lb_id in ignore_list: + continue + edge_id = binding['edge_id'] + + try: + lb_stats = vcns.get_loadbalancer_statistics(edge_id) + + virtual_servers_stats = lb_stats[1].get('virtualServer', []) + for vs_stats in virtual_servers_stats: + # Get the stats of the virtual server + stats = copy.copy(lb_const.LB_EMPTY_STATS) + stats['bytes_in'] += vs_stats.get('bytesIn', 0) + stats['bytes_out'] += vs_stats.get('bytesOut', 0) + stats['active_connections'] += vs_stats.get('curSessions', 0) + stats['total_connections'] += vs_stats.get('totalSessions', 0) + stats['request_errors'] = 0 # currently unsupported + + # Find the listener Id + vs_id = vs_stats.get('virtualServerId') + list_bind = nsxv_db.get_nsxv_lbaas_listener_binding_by_vse( + context.session, lb_id, vs_id) + if not list_bind: + continue + stats['id'] = list_bind['listener_id'] + + stat_list.append(stats) + + except vcns_exc.VcnsApiException as e: + LOG.warning('Failed to read load balancer statistics for %s: %s', + edge_id, e) + + return stat_list diff --git a/vmware_nsx/services/lbaas/nsx_v/implementation/loadbalancer_mgr.py b/vmware_nsx/services/lbaas/nsx_v/implementation/loadbalancer_mgr.py index 3d50ef2143..590c27ea06 100644 --- a/vmware_nsx/services/lbaas/nsx_v/implementation/loadbalancer_mgr.py +++ b/vmware_nsx/services/lbaas/nsx_v/implementation/loadbalancer_mgr.py @@ -31,6 +31,7 @@ from vmware_nsx.plugins.nsx_v.vshield.common import ( from vmware_nsx.plugins.nsx_v.vshield.common import exceptions as nsxv_exc from vmware_nsx.services.lbaas import base_mgr from vmware_nsx.services.lbaas.nsx_v import lbaas_common as lb_common +from vmware_nsx.services.lbaas.octavia import constants as oct_const LOG = logging.getLogger(__name__) @@ -159,33 +160,15 @@ class EdgeLoadBalancerManagerFromDict(base_mgr.EdgeLoadbalancerBaseManager): completor(success=True) def refresh(self, context, lb): - # TODO(kobis): implememnt + # TODO(kobis): implement pass def stats(self, context, lb): - stats = {'bytes_in': 0, - 'bytes_out': 0, - 'active_connections': 0, - 'total_connections': 0} - binding = nsxv_db.get_nsxv_lbaas_loadbalancer_binding(context.session, lb['id']) - try: - lb_stats = self.vcns.get_loadbalancer_statistics( - binding['edge_id']) - - except nsxv_exc.VcnsApiException: - msg = (_('Failed to read load balancer statistics, edge: %s') % - binding['edge_id']) - raise n_exc.BadRequest(resource='edge-lbaas', msg=msg) - - pools_stats = lb_stats[1].get('pool', []) - for pool_stats in pools_stats: - stats['bytes_in'] += pool_stats.get('bytesIn', 0) - stats['bytes_out'] += pool_stats.get('bytesOut', 0) - stats['active_connections'] += pool_stats.get('curSessions', 0) - stats['total_connections'] += pool_stats.get('totalSessions', 0) + stats = _get_edge_loadbalancer_statistics(self.vcns, + binding['edge_id']) return stats @@ -208,12 +191,15 @@ class EdgeLoadBalancerManagerFromDict(base_mgr.EdgeLoadbalancerBaseManager): subnet = self.core_plugin.get_subnet(context.elevated(), subnet_id) filters = {'fixed_ips': {'subnet_id': [subnet_id]}, - 'device_owner': [constants.DEVICE_OWNER_LOADBALANCERV2]} + 'device_owner': [constants.DEVICE_OWNER_LOADBALANCERV2, + oct_const.DEVICE_OWNER_OCTAVIA]} lb_ports = self.core_plugin.get_ports(context.elevated(), filters=filters) if lb_ports: for lb_port in lb_ports: + # TODO(asarfaty): for Octavia this code might need to change + # as the device_id is different if lb_port['device_id']: edge_bind = nsxv_db.get_nsxv_lbaas_loadbalancer_binding( context.session, lb_port['device_id']) @@ -238,3 +224,27 @@ class EdgeLoadBalancerManagerFromDict(base_mgr.EdgeLoadbalancerBaseManager): if not found: return False return True + + +def _get_edge_loadbalancer_statistics(vcns, edge_id): + stats = {'bytes_in': 0, + 'bytes_out': 0, + 'active_connections': 0, + 'total_connections': 0} + + try: + lb_stats = vcns.get_loadbalancer_statistics(edge_id) + + except nsxv_exc.VcnsApiException: + msg = (_('Failed to read load balancer statistics, edge: %s') % + edge_id) + raise n_exc.BadRequest(resource='edge-lbaas', msg=msg) + + pools_stats = lb_stats[1].get('pool', []) + for pool_stats in pools_stats: + stats['bytes_in'] += pool_stats.get('bytesIn', 0) + stats['bytes_out'] += pool_stats.get('bytesOut', 0) + stats['active_connections'] += pool_stats.get('curSessions', 0) + stats['total_connections'] += pool_stats.get('totalSessions', 0) + + return stats diff --git a/vmware_nsx/services/lbaas/nsx_v3/implementation/l7rule_mgr.py b/vmware_nsx/services/lbaas/nsx_v3/implementation/l7rule_mgr.py index 65defafddd..908114ab3b 100644 --- a/vmware_nsx/services/lbaas/nsx_v3/implementation/l7rule_mgr.py +++ b/vmware_nsx/services/lbaas/nsx_v3/implementation/l7rule_mgr.py @@ -46,6 +46,8 @@ class EdgeL7RuleManagerFromDict(base_mgr.Nsxv3LoadbalancerBaseManager): lb_rule_id = binding['lb_rule_id'] if delete: lb_utils.remove_rule_from_policy(rule) + else: + lb_utils.update_rule_in_policy(rule) rule_body = lb_utils.convert_l7policy_to_lb_rule( context, rule['policy']) try: diff --git a/vmware_nsx/services/lbaas/nsx_v3/implementation/lb_utils.py b/vmware_nsx/services/lbaas/nsx_v3/implementation/lb_utils.py index a4ee48cf01..10ef69bc38 100644 --- a/vmware_nsx/services/lbaas/nsx_v3/implementation/lb_utils.py +++ b/vmware_nsx/services/lbaas/nsx_v3/implementation/lb_utils.py @@ -203,3 +203,8 @@ def convert_l7policy_to_lb_rule(context, policy): def remove_rule_from_policy(rule): l7rules = rule['policy']['rules'] rule['policy']['rules'] = [r for r in l7rules if r['id'] != rule['id']] + + +def update_rule_in_policy(rule): + remove_rule_from_policy(rule) + rule['policy']['rules'].append(rule) diff --git a/vmware_nsx/services/lbaas/nsx_v3/implementation/listener_mgr.py b/vmware_nsx/services/lbaas/nsx_v3/implementation/listener_mgr.py index 6ea4cbffe2..b5094caf13 100644 --- a/vmware_nsx/services/lbaas/nsx_v3/implementation/listener_mgr.py +++ b/vmware_nsx/services/lbaas/nsx_v3/implementation/listener_mgr.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +import copy + from neutron_lib import exceptions as n_exc from oslo_log import helpers as log_helpers from oslo_log import log as logging @@ -281,3 +283,42 @@ class EdgeListenerManagerFromDict(base_mgr.Nsxv3LoadbalancerBaseManager): context.session, lb_id, listener['id']) completor(success=True) + + +def stats_getter(context, core_plugin, ignore_list=None): + """Update Octavia statistics for each listener (virtual server)""" + stat_list = [] + lb_service_client = core_plugin.nsxlib.load_balancer.service + # Go over all the loadbalancers & services + lb_bindings = nsx_db.get_nsx_lbaas_loadbalancer_bindings( + context.session) + for lb_binding in lb_bindings: + if ignore_list and lb_binding['loadbalancer_id'] in ignore_list: + continue + + lb_service_id = lb_binding.get('lb_service_id') + LOG.debug("Getting listeners statistics for NSX lb service %s", + lb_service_id) + try: + # get the NSX statistics for this LB service + rsp = lb_service_client.get_stats(lb_service_id) + if rsp and 'virtual_servers' in rsp: + # Go over each virtual server in the response + for vs in rsp['virtual_servers']: + # look up the virtual server in the DB + vs_bind = nsx_db.get_nsx_lbaas_listener_binding_by_vs_id( + context.session, vs['virtual_server_id']) + if vs_bind: + vs_stats = vs['statistics'] + stats = copy.copy(lb_const.LB_EMPTY_STATS) + stats['id'] = vs_bind.listener_id + stats['request_errors'] = 0 # currently unsupported + for stat in lb_const.LB_STATS_MAP: + lb_stat = lb_const.LB_STATS_MAP[stat] + stats[stat] += vs_stats[lb_stat] + stat_list.append(stats) + + except nsxlib_exc.ManagerError: + pass + + return stat_list diff --git a/vmware_nsx/services/lbaas/nsx_v3/implementation/loadbalancer_mgr.py b/vmware_nsx/services/lbaas/nsx_v3/implementation/loadbalancer_mgr.py index 3c31f5e9a0..0b6a5928f6 100644 --- a/vmware_nsx/services/lbaas/nsx_v3/implementation/loadbalancer_mgr.py +++ b/vmware_nsx/services/lbaas/nsx_v3/implementation/loadbalancer_mgr.py @@ -192,10 +192,10 @@ class EdgeLoadBalancerManagerFromDict(base_mgr.Nsxv3LoadbalancerBaseManager): for vs in vs_statuses.get('results', []): vs_status = self._nsx_status_to_lb_status(vs.get('status')) vs_id = vs.get('virtual_server_id') - listener_binding = nsx_db.get_nsx_lbaas_listener_binding_by_vs( + list_binding = nsx_db.get_nsx_lbaas_listener_binding_by_lb_and_vs( context.session, id, vs_id) - if listener_binding: - listener_id = listener_binding['listener_id'] + if list_binding: + listener_id = list_binding['listener_id'] statuses[lb_const.LISTENERS].append( {'id': listener_id, 'status': vs_status}) diff --git a/vmware_nsx/services/lbaas/nsx_v3/implementation/member_mgr.py b/vmware_nsx/services/lbaas/nsx_v3/implementation/member_mgr.py index b9a90e9cac..7ab0df6e72 100644 --- a/vmware_nsx/services/lbaas/nsx_v3/implementation/member_mgr.py +++ b/vmware_nsx/services/lbaas/nsx_v3/implementation/member_mgr.py @@ -65,10 +65,15 @@ class EdgeMemberManagerFromDict(base_mgr.Nsxv3LoadbalancerBaseManager): tenant_id, context.project_name) attachment = {'target_id': nsx_router_id, 'target_type': 'LogicalRouter'} - lb_service = service_client.create(display_name=lb_name, - tags=tags, - attachment=attachment, - size=lb_size) + try: + lb_service = service_client.create(display_name=lb_name, + tags=tags, + attachment=attachment, + size=lb_size) + except nsxlib_exc.ManagerError as e: + LOG.error("Failed to create LB service: %s", e) + return + # Update router to enable advertise_lb_vip flag self.core_plugin.nsxlib.logical_router.update_advertisement( nsx_router_id, advertise_lb_vip=True) diff --git a/vmware_nsx/services/lbaas/nsx_v3/v2/lb_driver_v2.py b/vmware_nsx/services/lbaas/nsx_v3/v2/lb_driver_v2.py index f6b8d16ead..b0dd1d6d0d 100644 --- a/vmware_nsx/services/lbaas/nsx_v3/v2/lb_driver_v2.py +++ b/vmware_nsx/services/lbaas/nsx_v3/v2/lb_driver_v2.py @@ -16,8 +16,8 @@ from neutron_lib.callbacks import events from neutron_lib.callbacks import registry from neutron_lib.callbacks import resources +from neutron_lib import constants as n_consts from neutron_lib import exceptions as n_exc -from neutron_lib.plugins import constants as plugin_const from oslo_log import helpers as log_helpers from oslo_log import log as logging @@ -33,6 +33,7 @@ from vmware_nsx.services.lbaas.nsx_v3.implementation import listener_mgr from vmware_nsx.services.lbaas.nsx_v3.implementation import loadbalancer_mgr from vmware_nsx.services.lbaas.nsx_v3.implementation import member_mgr from vmware_nsx.services.lbaas.nsx_v3.implementation import pool_mgr +from vmware_nsx.services.lbaas.octavia import constants as oct_const LOG = logging.getLogger(__name__) @@ -108,6 +109,9 @@ class EdgeLoadbalancerDriverV2(base_mgr.LoadbalancerBaseManager): # Check if there is any LB attachment for the NSX router. # This callback is subscribed here to prevent router/GW/interface # deletion if it still has LB service attached to it. + + #Note(asarfaty): Those callbacks are used by Octavia as well even + # though they are bound only here registry.subscribe(self._check_lb_service_on_router, resources.ROUTER, events.BEFORE_DELETE) registry.subscribe(self._check_lb_service_on_router, @@ -124,8 +128,9 @@ class EdgeLoadbalancerDriverV2(base_mgr.LoadbalancerBaseManager): resources.ROUTER_INTERFACE, events.BEFORE_DELETE) def _get_lb_ports(self, context, subnet_ids): - dev_owner = 'neutron:' + plugin_const.LOADBALANCERV2 - filters = {'device_owner': [dev_owner], + dev_owner_v2 = n_consts.DEVICE_OWNER_LOADBALANCERV2 + dev_owner_oct = oct_const.DEVICE_OWNER_OCTAVIA + filters = {'device_owner': [dev_owner_v2, dev_owner_oct], 'fixed_ips': {'subnet_id': subnet_ids}} return self.loadbalancer.core_plugin.get_ports( context, filters=filters) diff --git a/vmware_nsx/services/lbaas/octavia/__init__.py b/vmware_nsx/services/lbaas/octavia/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/vmware_nsx/services/lbaas/octavia/constants.py b/vmware_nsx/services/lbaas/octavia/constants.py new file mode 100644 index 0000000000..fbe629bda3 --- /dev/null +++ b/vmware_nsx/services/lbaas/octavia/constants.py @@ -0,0 +1,45 @@ +# Copyright 2018 VMware, Inc. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +OCTAVIA_TO_DRIVER_TOPIC = 'vmware_nsx__lb_listener' +DRIVER_TO_OCTAVIA_TOPIC = 'vmware_nsx__driver_listener' + +LOADBALANCER = 'loadbalancer' +LISTENER = 'listener' +POOL = 'pool' +HEALTHMONITOR = 'healthmonitor' +MEMBER = 'member' +L7POLICY = 'l7policy' +L7RULE = 'l7rule' + +LOADBALANCERS = 'loadbalancers' +LISTENERS = 'listeners' +POOLS = 'pools' +HEALTHMONITORS = 'healthmonitors' +MEMBERS = 'members' +L7POLICIES = 'l7policies' +L7RULES = 'l7rules' + +ONLINE = 'ONLINE' +OFFLINE = 'OFFLINE' +ERROR = 'ERROR' +ACTIVE = 'ACTIVE' +DELETED = 'DELETED' +ERROR = 'ERROR' + +OPERATING_STATUS = 'operating_status' +PROVISIONING_STATUS = 'provisioning_status' + +DEVICE_OWNER_OCTAVIA = 'Octavia' diff --git a/vmware_nsx/services/lbaas/octavia/octavia_driver.py b/vmware_nsx/services/lbaas/octavia/octavia_driver.py new file mode 100644 index 0000000000..a997f27e17 --- /dev/null +++ b/vmware_nsx/services/lbaas/octavia/octavia_driver.py @@ -0,0 +1,507 @@ +# Copyright 2018 VMware, Inc. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import socket + +from oslo_config import cfg +from oslo_log import helpers as log_helpers +from oslo_log import log as logging +import oslo_messaging as messaging +from oslo_messaging.rpc import dispatcher +import pecan +from stevedore import driver as stevedore_driver + +from octavia.api.drivers import driver_lib +from octavia.api.drivers import exceptions +from octavia.api.drivers import provider_base as driver_base +from octavia.api.drivers import utils as oct_utils +from octavia.db import api as db_apis +from octavia.db import repositories + +from vmware_nsx.services.lbaas.octavia import constants as d_const + + +LOG = logging.getLogger(__name__) +cfg.CONF.import_group('oslo_messaging', 'octavia.common.config') + +# List of keys per object type that will not be sent to the listener +unsupported_keys = {'Loadbalancer': ['vip_qos_policy_id'], + 'Listener': ['sni_container_refs', + 'insert_headers', + 'timeout_client_data', + 'timeout_member_connect', + 'timeout_member_data', + 'timeout_tcp_inspect'], + 'HealthMonitor': ['expected_codes', 'max_retries_down'], + 'Member': ['monitor_address', 'monitor_port', 'backup']} + + +class NSXOctaviaDriver(driver_base.ProviderDriver): + @log_helpers.log_method_call + def __init__(self): + super(NSXOctaviaDriver, self).__init__() + self._init_rpc_messaging() + self._init_rpc_listener() + self._init_cert_manager() + self.repositories = repositories.Repositories() + + @log_helpers.log_method_call + def _init_rpc_messaging(self): + topic = d_const.OCTAVIA_TO_DRIVER_TOPIC + transport = messaging.get_rpc_transport(cfg.CONF) + target = messaging.Target(topic=topic, exchange="common", + namespace='control', fanout=False, + version='1.0') + self.client = messaging.RPCClient(transport, target) + + @log_helpers.log_method_call + def _init_rpc_listener(self): + # Initialize RPC listener + topic = d_const.DRIVER_TO_OCTAVIA_TOPIC + server = socket.gethostname() + transport = messaging.get_rpc_transport(cfg.CONF) + target = messaging.Target(topic=topic, server=server, + exchange="common", fanout=False) + endpoints = [NSXOctaviaDriverEndpoint()] + access_policy = dispatcher.DefaultRPCAccessPolicy + self.octavia_server = messaging.get_rpc_server( + transport, target, endpoints, executor='threading', + access_policy=access_policy) + self.octavia_server.start() + + @log_helpers.log_method_call + def _init_cert_manager(self): + self.cert_manager = stevedore_driver.DriverManager( + namespace='octavia.cert_manager', + name=cfg.CONF.certificates.cert_manager, + invoke_on_load=True).driver + + def get_obj_project_id(self, obj_type, obj_dict): + if obj_dict.get('project_id'): + return obj_dict['project_id'] + if obj_dict.get('tenant_id'): + return obj_dict['tenant_id'] + + # look for the project id of the attached objects + project_id = None + if obj_dict.get('loadbalancer_id'): + db_lb = self.repositories.load_balancer.get( + db_apis.get_session(), id=obj_dict['loadbalancer_id']) + if db_lb: + project_id = db_lb.project_id + if not project_id and obj_dict.get('pool_id'): + db_pool = self.repositories.pool.get( + db_apis.get_session(), id=obj_dict['pool_id']) + if db_pool: + project_id = db_pool.load_balancer.project_id + if not project_id and obj_dict.get('listener_id'): + db_list = self.repositories.listener.get( + db_apis.get_session(), id=obj_dict['listener_id']) + if db_list: + project_id = db_list.load_balancer.project_id + if not project_id and obj_dict.get('l7policy_id'): + db_policy = self.repositories.l7policy.get( + db_apis.get_session(), id=obj_dict['l7policy_id']) + if db_policy: + if db_policy.listener: + db_lb = db_policy.listener.load_balancer + elif db_policy.redirect_pool: + db_lb = db_policy.redirect_pool.load_balancer + if db_lb: + project_id = db_lb.project_id + + if not project_id: + LOG.warning("Could bot find the tenant id for %(type)s " + "%(obj)s", {'type': obj_type, 'obj': obj_dict}) + return project_id + + def _get_load_balancer_dict(self, loadbalancer_id): + if not loadbalancer_id: + return + db_lb = self.repositories.load_balancer.get( + db_apis.get_session(), id=loadbalancer_id) + if not db_lb: + return + lb_dict = {'name': db_lb.name, 'id': loadbalancer_id} + if db_lb.vip: + lb_dict['vip_port_id'] = db_lb.vip.port_id + lb_dict['vip_address'] = db_lb.vip.ip_address + lb_dict['vip_port_id'] = db_lb.vip.port_id + lb_dict['vip_network_id'] = db_lb.vip.network_id + lb_dict['vip_subnet_id'] = db_lb.vip.subnet_id + return lb_dict + + def _get_listener_in_pool_dict(self, pool_dict): + if 'listener' not in pool_dict: + if pool_dict.get('listener_id'): + db_listener = self.repositories.listener.get( + db_apis.get_session(), id=pool_dict['listener_id']) + listener_obj = oct_utils.db_listener_to_provider_listener( + db_listener) + listener_dict = listener_obj.to_dict( + recurse=False, render_unsets=True) + listener_dict['id'] = listener_dict['listener_id'] + listener_dict['l7_policies'] = listener_dict['l7policies'] + pool_dict['listener'] = listener_dict + if 'listeners' not in pool_dict: + # multiple listeners is not really supported yet + pool_dict['listeners'] = [listener_dict] + else: + pool_dict['listener'] = None + if 'listeners' not in pool_dict: + pool_dict['listeners'] = [] + + def _get_pool_dict(self, pool_id): + if not pool_id: + return + db_pool = self.repositories.pool.get(db_apis.get_session(), id=pool_id) + if not db_pool: + return + pool_obj = oct_utils.db_pool_to_provider_pool(db_pool) + pool_dict = pool_obj.to_dict(recurse=True, render_unsets=True) + pool_dict['id'] = pool_id + # Get the load balancer object + if pool_dict.get('loadbalancer_id'): + # Generate a loadbalancer object + pool_dict['loadbalancer'] = self._get_load_balancer_dict( + pool_dict['loadbalancer_id']) + if 'listener' not in pool_dict: + self._get_listener_in_pool_dict(pool_dict) + return pool_dict + + def update_policy_dict(self, policy_dict, policy_obj, is_update=False): + if policy_dict.get('listener_id'): + db_list = self.repositories.listener.get( + db_apis.get_session(), id=policy_dict['listener_id']) + list_obj = oct_utils.db_listener_to_provider_listener(db_list) + list_dict = list_obj.to_dict(recurse=True, render_unsets=True) + list_dict['id'] = policy_dict['listener_id'] + policy_dict['listener'] = list_dict + if policy_obj.rules: + policy_dict['rules'] = [] + for rule in policy_obj.rules: + rule_dict = rule.to_dict(recurse=False, render_unsets=True) + rule_dict['id'] = rule_dict['l7rule_id'] + policy_dict['rules'].append(rule_dict) + elif not is_update: + policy_dict['rules'] = [] + + def _remove_unsupported_keys(self, obj_type, obj_dict): + for key in unsupported_keys.get(obj_type, []): + if key in obj_dict: + if obj_dict.get(key): + LOG.warning("Ignoring %(key)s:%(val)s in %(type)s as the " + "NSX plugin does not currently support it", + {'key': key, 'val': obj_dict[key], + 'type': obj_type}) + del obj_dict[key] + + def obj_to_dict(self, obj, is_update=False, project_id=None): + obj_type = obj.__class__.__name__ + # create a dictionary out of the object + render_unsets = False if is_update else True + obj_dict = obj.to_dict(recurse=True, render_unsets=render_unsets) + + # Update the dictionary to match what the nsx driver expects + if not project_id: + project_id = self.get_obj_project_id(obj_type, obj_dict) + obj_dict['tenant_id'] = obj_dict['project_id'] = project_id + + if 'id' not in obj_dict: + obj_dict['id'] = obj_dict.get('%s_id' % obj_type.lower()) + + if not obj_dict.get('name') and not is_update: + obj_dict['name'] = "" + + self._remove_unsupported_keys(obj_type, obj_dict) + + if obj_type == 'LoadBalancer': + # clean listeners and pools for update case: + if 'listeners' in obj_dict: + if is_update and not obj_dict['listeners']: + del obj_dict['listeners'] + else: + if obj_dict['listeners'] is None: + obj_dict['listeners'] = [] + for listener in obj_dict['listeners']: + listener['id'] = listener['listener_id'] + if 'pools' in obj_dict: + if is_update and not obj_dict['pools']: + del obj_dict['pools'] + else: + if obj_dict['pools'] is None: + obj_dict['pools'] = [] + for pool in obj_dict['pools']: + pool['id'] = pool['pool_id'] + + elif obj_type == 'Listener': + if 'l7policies' in obj_dict: + obj_dict['l7_policies'] = obj_dict['l7policies'] + if obj_dict.get('loadbalancer_id'): + # Generate a loadbalancer object + obj_dict['loadbalancer'] = self._get_load_balancer_dict( + obj_dict['loadbalancer_id']) + # TODO(asarfaty): add default_tls_container_id + + elif obj_type == 'Pool': + if 'listener' not in obj_dict: + self._get_listener_in_pool_dict(obj_dict) + + elif obj_type == 'Member': + # Get the pool object + if obj_dict.get('pool_id'): + obj_dict['pool'] = self._get_pool_dict(obj_dict['pool_id']) + obj_dict['loadbalancer'] = None + if 'loadbalancer' in obj_dict['pool']: + obj_dict['loadbalancer'] = obj_dict['pool']['loadbalancer'] + if not obj_dict.get('subnet_id'): + # Use the parent vip_subnet_id instead + obj_dict['subnet_id'] = obj_dict['loadbalancer'][ + 'vip_subnet_id'] + else: + obj_dict['pool'] = None + obj_dict['loadbalancer'] = None + + elif obj_type == 'HealthMonitor': + # Get the pool object + if obj_dict.get('pool_id'): + obj_dict['pool'] = self._get_pool_dict(obj_dict['pool_id']) + + elif obj_type == 'L7Policy': + self.update_policy_dict(obj_dict, obj, is_update=is_update) + + elif obj_type == 'L7Rule': + # Get the L7 policy object + if obj_dict.get('l7policy_id'): + db_policy = self.repositories.l7policy.get( + db_apis.get_session(), id=obj_dict['l7policy_id']) + policy_obj = oct_utils.db_l7policy_to_provider_l7policy( + db_policy) + policy_dict = policy_obj.to_dict( + recurse=True, render_unsets=True) + policy_dict['id'] = obj_dict['l7policy_id'] + self.update_policy_dict( + policy_dict, policy_obj, is_update=is_update) + obj_dict['policy'] = policy_dict + + LOG.debug("Translated %(type)s to dictionary: %(obj)s", + {'type': obj_type, 'obj': obj_dict}) + return obj_dict + + # Load Balancer + @log_helpers.log_method_call + def create_vip_port(self, loadbalancer_id, project_id, vip_dictionary): + raise exceptions.NotImplementedError() + + @log_helpers.log_method_call + def loadbalancer_create(self, loadbalancer): + kw = {'loadbalancer': self.obj_to_dict(loadbalancer)} + self.client.cast({}, 'loadbalancer_create', **kw) + + @log_helpers.log_method_call + def loadbalancer_delete(self, loadbalancer, cascade=False): + if cascade: + # TODO(asarfaty) add support for cascade + LOG.warning("The NSX Octavia driver does not support loadbalancer " + "delete cascade") + raise exceptions.NotImplementedError() + kw = {'loadbalancer': self.obj_to_dict(loadbalancer), + 'cascade': cascade} + self.client.cast({}, 'loadbalancer_delete', **kw) + + @log_helpers.log_method_call + def loadbalancer_failover(self, loadbalancer_id): + LOG.error('Loadbalancer failover is handled by platform') + raise exceptions.NotImplementedError() + + @log_helpers.log_method_call + def loadbalancer_update(self, old_loadbalancer, new_loadbalancer): + old_dict = self.obj_to_dict(old_loadbalancer) + new_dict = copy.deepcopy(old_dict) + new_dict.update(self.obj_to_dict( + new_loadbalancer, is_update=True, + project_id=old_dict.get('project_id'))) + kw = {'old_loadbalancer': old_dict, + 'new_loadbalancer': new_dict} + self.client.cast({}, 'loadbalancer_update', **kw) + + # Listener + @log_helpers.log_method_call + def listener_create(self, listener): + cert = None + dict_list = self.obj_to_dict(listener) + if dict_list.get('tls_certificate_id'): + context = pecan.request.context.get('octavia_context') + cert = self.cert_manager.get_cert(context, + dict_list['tls_certificate_id']) + kw = {'listener': dict_list, 'cert': cert} + self.client.cast({}, 'listener_create', **kw) + + @log_helpers.log_method_call + def listener_delete(self, listener): + kw = {'listener': self.obj_to_dict(listener)} + self.client.cast({}, 'listener_delete', **kw) + + @log_helpers.log_method_call + def listener_update(self, old_listener, new_listener): + old_dict = self.obj_to_dict(old_listener) + new_dict = copy.deepcopy(old_dict) + new_dict.update(self.obj_to_dict( + new_listener, is_update=True, + project_id=old_dict.get('project_id'))) + cert = None + if new_dict.get('tls_certificate_id'): + context = pecan.request.context.get('octavia_context') + cert = self.cert_manager.get_cert(context, + new_dict['tls_certificate_id']) + kw = {'old_listener': old_dict, + 'new_listener': new_dict, + 'cert': cert} + self.client.cast({}, 'listener_update', **kw) + + # Pool + @log_helpers.log_method_call + def pool_create(self, pool): + kw = {'pool': self.obj_to_dict(pool)} + self.client.cast({}, 'pool_create', **kw) + + @log_helpers.log_method_call + def pool_delete(self, pool): + kw = {'pool': self.obj_to_dict(pool)} + self.client.cast({}, 'pool_delete', **kw) + + @log_helpers.log_method_call + def pool_update(self, old_pool, new_pool): + old_dict = self.obj_to_dict(old_pool) + new_dict = copy.deepcopy(old_dict) + new_dict.update(self.obj_to_dict( + new_pool, is_update=True, project_id=old_dict.get('project_id'))) + kw = {'old_pool': old_dict, + 'new_pool': new_dict} + self.client.cast({}, 'pool_update', **kw) + + # Member + @log_helpers.log_method_call + def member_create(self, member): + kw = {'member': self.obj_to_dict(member)} + self.client.cast({}, 'member_create', **kw) + + @log_helpers.log_method_call + def member_delete(self, member): + kw = {'member': self.obj_to_dict(member)} + self.client.cast({}, 'member_delete', **kw) + + @log_helpers.log_method_call + def member_update(self, old_member, new_member): + old_dict = self.obj_to_dict(old_member) + new_dict = copy.deepcopy(old_dict) + new_dict.update(self.obj_to_dict( + new_member, is_update=True, project_id=old_dict.get('project_id'))) + kw = {'old_member': old_dict, + 'new_member': new_dict} + self.client.cast({}, 'member_update', **kw) + + @log_helpers.log_method_call + def member_batch_update(self, members): + raise NotImplementedError() + + # Health Monitor + @log_helpers.log_method_call + def health_monitor_create(self, healthmonitor): + kw = {'healthmonitor': self.obj_to_dict(healthmonitor)} + self.client.cast({}, 'healthmonitor_create', **kw) + + @log_helpers.log_method_call + def health_monitor_delete(self, healthmonitor): + kw = {'healthmonitor': self.obj_to_dict(healthmonitor)} + self.client.cast({}, 'healthmonitor_delete', **kw) + + @log_helpers.log_method_call + def health_monitor_update(self, old_healthmonitor, new_healthmonitor): + old_dict = self.obj_to_dict(old_healthmonitor) + new_dict = copy.deepcopy(old_dict) + new_dict.update(self.obj_to_dict( + new_healthmonitor, is_update=True, + project_id=old_dict.get('project_id'))) + kw = {'old_healthmonitor': old_dict, + 'new_healthmonitor': new_dict} + self.client.cast({}, 'healthmonitor_update', **kw) + + # L7 Policy + @log_helpers.log_method_call + def l7policy_create(self, l7policy): + kw = {'l7policy': self.obj_to_dict(l7policy)} + self.client.cast({}, 'l7policy_create', **kw) + + @log_helpers.log_method_call + def l7policy_delete(self, l7policy): + kw = {'l7policy': self.obj_to_dict(l7policy)} + self.client.cast({}, 'l7policy_delete', **kw) + + @log_helpers.log_method_call + def l7policy_update(self, old_l7policy, new_l7policy): + old_dict = self.obj_to_dict(old_l7policy) + new_dict = copy.deepcopy(old_dict) + new_dict.update(self.obj_to_dict( + new_l7policy, is_update=True, + project_id=old_dict.get('project_id'))) + kw = {'old_l7policy': old_dict, + 'new_l7policy': new_dict} + self.client.cast({}, 'l7policy_update', **kw) + + # L7 Rule + @log_helpers.log_method_call + def l7rule_create(self, l7rule): + kw = {'l7rule': self.obj_to_dict(l7rule)} + self.client.cast({}, 'l7rule_create', **kw) + + @log_helpers.log_method_call + def l7rule_delete(self, l7rule): + kw = {'l7rule': self.obj_to_dict(l7rule)} + self.client.cast({}, 'l7rule_delete', **kw) + + @log_helpers.log_method_call + def l7rule_update(self, old_l7rule, new_l7rule): + old_dict = self.obj_to_dict(old_l7rule) + new_dict = copy.deepcopy(old_dict) + new_dict.update(self.obj_to_dict( + new_l7rule, is_update=True, project_id=old_dict.get('project_id'))) + kw = {'old_l7rule': old_dict, + 'new_l7rule': new_dict} + self.client.cast({}, 'l7rule_update', **kw) + + # Flavor + @log_helpers.log_method_call + def get_supported_flavor_metadata(self): + raise exceptions.NotImplementedError() + + @log_helpers.log_method_call + def validate_flavor(self, flavor_metadata): + raise exceptions.NotImplementedError() + + +class NSXOctaviaDriverEndpoint(driver_lib.DriverLibrary): + target = messaging.Target(namespace="control", version='1.0') + + @log_helpers.log_method_call + def update_loadbalancer_status(self, ctxt, status): + return super(NSXOctaviaDriverEndpoint, + self).update_loadbalancer_status(status) + + @log_helpers.log_method_call + def update_listener_statistics(self, ctxt, statistics): + return super(NSXOctaviaDriverEndpoint, + self).update_listener_statistics(statistics) diff --git a/vmware_nsx/services/lbaas/octavia/octavia_listener.py b/vmware_nsx/services/lbaas/octavia/octavia_listener.py new file mode 100644 index 0000000000..bb2a58aa64 --- /dev/null +++ b/vmware_nsx/services/lbaas/octavia/octavia_listener.py @@ -0,0 +1,368 @@ +# Copyright 2018 VMware, Inc. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import socket +import time + +import eventlet + +from neutron_lib import context as neutron_context +from oslo_config import cfg +from oslo_log import helpers as log_helpers +from oslo_log import log as logging +import oslo_messaging as messaging +from oslo_messaging.rpc import dispatcher + +from neutron_lbaas.db.loadbalancer import models + +from vmware_nsx.services.lbaas.octavia import constants + +LOG = logging.getLogger(__name__) + + +class NSXOctaviaListener(object): + @log_helpers.log_method_call + def __init__(self, loadbalancer=None, listener=None, pool=None, + member=None, healthmonitor=None, l7policy=None, l7rule=None): + self._init_rpc_messaging() + self._init_rpc_listener(healthmonitor, l7policy, l7rule, listener, + loadbalancer, member, pool) + + def _init_rpc_messaging(self): + topic = constants.DRIVER_TO_OCTAVIA_TOPIC + transport = messaging.get_rpc_transport(cfg.CONF) + target = messaging.Target(topic=topic, exchange="common", + namespace='control', fanout=False, + version='1.0') + self.client = messaging.RPCClient(transport, target) + + def _init_rpc_listener(self, healthmonitor, l7policy, l7rule, listener, + loadbalancer, member, pool): + # Initialize RPC listener + topic = constants.OCTAVIA_TO_DRIVER_TOPIC + server = socket.gethostname() + transport = messaging.get_rpc_transport(cfg.CONF) + target = messaging.Target(topic=topic, server=server, + exchange="common", fanout=False) + self.endpoints = [NSXOctaviaListenerEndpoint( + client=self.client, loadbalancer=loadbalancer, listener=listener, + pool=pool, member=member, healthmonitor=healthmonitor, + l7policy=l7policy, l7rule=l7rule)] + access_policy = dispatcher.DefaultRPCAccessPolicy + self.octavia_server = messaging.get_rpc_server( + transport, target, self.endpoints, executor='eventlet', + access_policy=access_policy) + self.octavia_server.start() + + +class NSXOctaviaListenerEndpoint(object): + target = messaging.Target(namespace="control", version='1.0') + + def __init__(self, client=None, loadbalancer=None, listener=None, + pool=None, member=None, healthmonitor=None, l7policy=None, + l7rule=None): + + self.client = client + self.loadbalancer = loadbalancer + self.listener = listener + self.pool = pool + self.member = member + self.healthmonitor = healthmonitor + self.l7policy = l7policy + self.l7rule = l7rule + + def get_completor_func(self, obj_type, obj, delete=False): + # return a method that will be called on success/failure completion + def completor_func(success=True): + LOG.debug("Octavia transaction completed. status %s", + 'success' if success else 'failure') + + # calculate the provisioning and operating statuses + main_prov_status = constants.ACTIVE + parent_prov_status = constants.ACTIVE + if not success: + main_prov_status = constants.ERROR + parent_prov_status = constants.ERROR + elif delete: + main_prov_status = constants.DELETED + op_status = constants.ONLINE if success else constants.ERROR + + # add the status of the created/deleted/updated object + status_dict = { + obj_type: [{ + 'id': obj['id'], + constants.PROVISIONING_STATUS: main_prov_status, + constants.OPERATING_STATUS: op_status}]} + + # Get all its parents, and update their statuses as well + loadbalancer_id = None + listener_id = None + pool_id = None + policy_id = None + if obj_type != constants.LOADBALANCERS: + loadbalancer_id = None + if obj.get('loadbalancer_id'): + loadbalancer_id = obj.get('loadbalancer_id') + elif obj.get('pool'): + pool_id = obj['pool']['id'] + loadbalancer_id = obj['pool']['loadbalancer_id'] + elif obj.get('listener'): + listener_id = obj['listener']['id'] + loadbalancer_id = obj['listener']['loadbalancer_id'] + elif obj.get('policy') and obj['policy'].get('listener'): + policy_id = obj['policy']['id'] + listener_id = obj['policy']['listener']['id'] + loadbalancer_id = obj['policy']['listener'][ + 'loadbalancer_id'] + + if loadbalancer_id: + status_dict[constants.LOADBALANCERS] = [{ + 'id': loadbalancer_id, + constants.PROVISIONING_STATUS: parent_prov_status, + constants.OPERATING_STATUS: op_status}] + if listener_id: + status_dict[constants.LISTENERS] = [{ + 'id': listener_id, + constants.PROVISIONING_STATUS: parent_prov_status, + constants.OPERATING_STATUS: op_status}] + if pool_id: + status_dict[constants.POOLS] = [{ + 'id': pool_id, + constants.PROVISIONING_STATUS: parent_prov_status, + constants.OPERATING_STATUS: op_status}] + if policy_id: + status_dict[constants.L7POLICIES] = [{ + 'id': policy_id, + constants.PROVISIONING_STATUS: parent_prov_status, + constants.OPERATING_STATUS: op_status}] + + kw = {'status': status_dict} + self.client.cast({}, 'update_loadbalancer_status', **kw) + + return completor_func + + def update_listener_statistics(self, statistics): + kw = {'statistics': statistics} + self.client.cast({}, 'update_listener_statistics', **kw) + + @log_helpers.log_method_call + def loadbalancer_create(self, ctxt, loadbalancer): + ctx = neutron_context.Context(None, loadbalancer['project_id']) + self.loadbalancer.create( + ctx, loadbalancer, + self.get_completor_func(constants.LOADBALANCERS, + loadbalancer)) + + @log_helpers.log_method_call + def loadbalancer_delete(self, ctxt, loadbalancer, cascade=False): + ctx = neutron_context.Context(None, loadbalancer['project_id']) + # TODO(asarfaty): No support for cascade. It is blocked by the driver + self.loadbalancer.delete( + ctx, loadbalancer, + self.get_completor_func(constants.LOADBALANCERS, + loadbalancer, + delete=True)) + + @log_helpers.log_method_call + def loadbalancer_update(self, ctxt, old_loadbalancer, new_loadbalancer): + ctx = neutron_context.Context(None, old_loadbalancer['project_id']) + self.loadbalancer.update( + ctx, old_loadbalancer, new_loadbalancer, + self.get_completor_func(constants.LOADBALANCERS, + new_loadbalancer)) + + # Listener + @log_helpers.log_method_call + def listener_create(self, ctxt, listener, cert): + ctx = neutron_context.Context(None, listener['project_id']) + self.listener.create(ctx, listener, + self.get_completor_func(constants.LISTENERS, + listener), + certificate=cert) + + @log_helpers.log_method_call + def listener_delete(self, ctxt, listener): + ctx = neutron_context.Context(None, listener['project_id']) + self.listener.delete(ctx, listener, + self.get_completor_func(constants.LISTENERS, + listener, + delete=True)) + + @log_helpers.log_method_call + def listener_update(self, ctxt, old_listener, new_listener, cert): + ctx = neutron_context.Context(None, old_listener['project_id']) + self.listener.update(ctx, old_listener, new_listener, + self.get_completor_func(constants.LISTENERS, + new_listener), + certificate=cert) + + # Pool + @log_helpers.log_method_call + def pool_create(self, ctxt, pool): + ctx = neutron_context.Context(None, pool['project_id']) + self.pool.create(ctx, pool, self.get_completor_func(constants.POOLS, + pool)) + + @log_helpers.log_method_call + def pool_delete(self, ctxt, pool): + ctx = neutron_context.Context(None, pool['project_id']) + self.pool.delete(ctx, pool, self.get_completor_func(constants.POOLS, + pool, + delete=True)) + + @log_helpers.log_method_call + def pool_update(self, ctxt, old_pool, new_pool): + ctx = neutron_context.Context(None, old_pool['project_id']) + self.pool.update(ctx, old_pool, new_pool, + self.get_completor_func(constants.POOLS, new_pool)) + + # Member + @log_helpers.log_method_call + def member_create(self, ctxt, member): + ctx = neutron_context.Context(None, member['project_id']) + self.member.create(ctx, member, + self.get_completor_func(constants.MEMBERS, + member)) + + @log_helpers.log_method_call + def member_delete(self, ctxt, member): + ctx = neutron_context.Context(None, member['project_id']) + self.member.delete(ctx, member, + self.get_completor_func(constants.MEMBERS, + member, + delete=True)) + + @log_helpers.log_method_call + def member_update(self, ctxt, old_member, new_member): + ctx = neutron_context.Context(None, old_member['project_id']) + self.member.update(ctx, old_member, new_member, + self.get_completor_func(constants.MEMBERS, + new_member)) + + # Health Monitor + @log_helpers.log_method_call + def healthmonitor_create(self, ctxt, healthmonitor): + ctx = neutron_context.Context(None, healthmonitor['project_id']) + self.healthmonitor.create(ctx, healthmonitor, + self.get_completor_func( + constants.HEALTHMONITORS, healthmonitor)) + + @log_helpers.log_method_call + def healthmonitor_delete(self, ctxt, healthmonitor): + ctx = neutron_context.Context(None, healthmonitor['project_id']) + self.healthmonitor.delete(ctx, healthmonitor, + self.get_completor_func( + constants.HEALTHMONITORS, healthmonitor, + delete=True)) + + @log_helpers.log_method_call + def healthmonitor_update(self, ctxt, old_healthmonitor, new_healthmonitor): + ctx = neutron_context.Context(None, old_healthmonitor['project_id']) + self.healthmonitor.update(ctx, old_healthmonitor, new_healthmonitor, + self.get_completor_func( + constants.HEALTHMONITORS, + new_healthmonitor)) + + # L7 Policy + @log_helpers.log_method_call + def l7policy_create(self, ctxt, l7policy): + ctx = neutron_context.Context(None, l7policy['project_id']) + self.l7policy.create(ctx, l7policy, + self.get_completor_func(constants.L7POLICIES, + l7policy)) + + @log_helpers.log_method_call + def l7policy_delete(self, ctxt, l7policy): + ctx = neutron_context.Context(None, l7policy['project_id']) + self.l7policy.delete(ctx, l7policy, + self.get_completor_func(constants.L7POLICIES, + l7policy, + delete=True)) + + @log_helpers.log_method_call + def l7policy_update(self, ctxt, old_l7policy, new_l7policy): + ctx = neutron_context.Context(None, old_l7policy['project_id']) + self.l7policy.update(ctx, old_l7policy, new_l7policy, + self.get_completor_func(constants.L7POLICIES, + new_l7policy)) + + # L7 Rule + @log_helpers.log_method_call + def l7rule_create(self, ctxt, l7rule): + ctx = neutron_context.Context(None, l7rule['project_id']) + self.l7rule.create(ctx, l7rule, + self.get_completor_func(constants.L7RULES, l7rule)) + + @log_helpers.log_method_call + def l7rule_delete(self, ctxt, l7rule): + ctx = neutron_context.Context(None, l7rule['project_id']) + self.l7rule.delete(ctx, l7rule, + self.get_completor_func(constants.L7RULES, + l7rule, + delete=True)) + + @log_helpers.log_method_call + def l7rule_update(self, ctxt, old_l7rule, new_l7rule): + ctx = neutron_context.Context(None, old_l7rule['project_id']) + self.l7rule.update(ctx, old_l7rule, new_l7rule, + self.get_completor_func(constants.L7RULES, + new_l7rule)) + + +class NSXOctaviaStatisticsCollector(object): + def __init__(self, core_plugin, listener_stats_getter): + self.core_plugin = core_plugin + self.listener_stats_getter = listener_stats_getter + if cfg.CONF.octavia_stats_interval: + eventlet.spawn_n(self.thread_runner, + cfg.CONF.octavia_stats_interval) + + @log_helpers.log_method_call + def thread_runner(self, interval): + while True: + time.sleep(interval) + self.collect() + + def _get_nl_loadbalancers(self, context): + """Getting the list of neutron-lbaas loadbalancers + + This is done directly from the neutron-lbaas DB to also support the + case that the plugin is currently unavailable, but entries already + exist on the DB. + """ + nl_loadbalancers = context.session.query(models.LoadBalancer).all() + return [lb.id for lb in nl_loadbalancers] + + @log_helpers.log_method_call + def collect(self): + if not self.core_plugin.octavia_listener: + return + + endpoint = self.core_plugin.octavia_listener.endpoints[0] + context = neutron_context.get_admin_context() + + # get the statistics of all the Octavia loadbalancers/listeners while + # ignoring the neutron-lbaas loadbalancers. + # Note(asarfaty): The Octavia plugin/DB is unavailable from the + # neutron context, so there is no option to query the Octavia DB for + # the relevant loadbalancers. + nl_loadbalancers = self._get_nl_loadbalancers(context) + listeners_stats = self.listener_stats_getter( + context, self.core_plugin, ignore_list=nl_loadbalancers) + if not listeners_stats: + # Avoid sending empty stats + return + stats = {'listeners': listeners_stats} + endpoint.update_listener_statistics(stats) diff --git a/vmware_nsx/tests/unit/services/lbaas/test_nsxv3_driver.py b/vmware_nsx/tests/unit/services/lbaas/test_nsxv3_driver.py index e6fe395c5d..7ec9fd6d4c 100644 --- a/vmware_nsx/tests/unit/services/lbaas/test_nsxv3_driver.py +++ b/vmware_nsx/tests/unit/services/lbaas/test_nsxv3_driver.py @@ -293,7 +293,8 @@ class TestEdgeLbaasV2Loadbalancer(BaseTestEdgeLbaasV2): ) as mock_get_pool_binding, \ mock.patch.object(self.pool_client, 'get' ) as mock_get_pool, \ - mock.patch.object(nsx_db, 'get_nsx_lbaas_listener_binding_by_vs' + mock.patch.object(nsx_db, + 'get_nsx_lbaas_listener_binding_by_lb_and_vs' ) as mock_get_listener_binding: mock_get_lb_binding.return_value = LB_BINDING mock_get_pool_binding.return_value = POOL_BINDING diff --git a/vmware_nsx/tests/unit/services/lbaas/test_octavia_driver.py b/vmware_nsx/tests/unit/services/lbaas/test_octavia_driver.py new file mode 100644 index 0000000000..67e91fc56e --- /dev/null +++ b/vmware_nsx/tests/unit/services/lbaas/test_octavia_driver.py @@ -0,0 +1,477 @@ +# Copyright 2018 VMware, Inc. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import decorator +import mock +import testtools + +from oslo_utils import uuidutils + +code_ok = True +# Skip duplications between Octavia & Neutron configurations +with mock.patch('oslo_config.cfg.ConfigOpts.import_group'): + try: + from octavia.api.drivers import data_models + from vmware_nsx.services.lbaas.octavia import octavia_driver as driver + except ImportError: + # Octavia code not found + # this can happen as Octavia is not in the requirements yet + code_ok = False + +DRIVER = 'vmware_nsx.services.lbaas.octavia.octavia_driver.NSXOctaviaDriver' + + +class TestNsxProviderDriver(testtools.TestCase): + """Test the NSX Octavia driver + + Make sure all the relevant data is translated and sent to the listener + """ + def setUp(self): + super(TestNsxProviderDriver, self).setUp() + global code_ok + if not code_ok: + return + # init the NSX driver without the RPC & certificate + with mock.patch(DRIVER + '._init_rpc_messaging'), \ + mock.patch(DRIVER + '._init_rpc_listener'), \ + mock.patch(DRIVER + '._init_cert_manager'): + self.driver = driver.NSXOctaviaDriver() + self.driver.client = mock.Mock() + + self.loadbalancer_id = uuidutils.generate_uuid() + self.vip_address = '192.0.2.10' + self.vip_network_id = uuidutils.generate_uuid() + self.vip_port_id = uuidutils.generate_uuid() + self.vip_subnet_id = uuidutils.generate_uuid() + self.listener_id = uuidutils.generate_uuid() + self.pool_id = uuidutils.generate_uuid() + self.member_id = uuidutils.generate_uuid() + self.member_subnet_id = uuidutils.generate_uuid() + self.healthmonitor_id = uuidutils.generate_uuid() + self.l7policy_id = uuidutils.generate_uuid() + self.l7rule_id = uuidutils.generate_uuid() + self.project_id = uuidutils.generate_uuid() + self.default_tls_container_ref = uuidutils.generate_uuid() + self.sni_container_ref_1 = uuidutils.generate_uuid() + self.sni_container_ref_2 = uuidutils.generate_uuid() + + self.ref_member = data_models.Member( + address='198.51.100.4', + admin_state_up=True, + member_id=self.member_id, + monitor_address='203.0.113.2', + monitor_port=66, + name='jacket', + pool_id=self.pool_id, + protocol_port=99, + subnet_id=self.member_subnet_id, + weight=55) + + self.ref_healthmonitor = data_models.HealthMonitor( + admin_state_up=False, + delay=2, + expected_codes="500", + healthmonitor_id=self.healthmonitor_id, + http_method='TRACE', + max_retries=1, + max_retries_down=0, + name='doc', + pool_id=self.pool_id, + timeout=3, + type='PHD', + url_path='/index.html') + + self.ref_pool = data_models.Pool( + admin_state_up=True, + description='Olympic swimming pool', + healthmonitor=self.ref_healthmonitor, + lb_algorithm='A_Fast_One', + loadbalancer_id=self.loadbalancer_id, + members=[self.ref_member], + name='Osborn', + pool_id=self.pool_id, + protocol='avian', + session_persistence={'type': 'glue'}) + + self.ref_l7rule = data_models.L7Rule( + admin_state_up=True, + compare_type='store_brand', + invert=True, + key='board', + l7policy_id=self.l7policy_id, + l7rule_id=self.l7rule_id, + type='strict', + value='gold') + + self.ref_l7policy = data_models.L7Policy( + action='packed', + admin_state_up=False, + description='Corporate policy', + l7policy_id=self.l7policy_id, + listener_id=self.listener_id, + name='more_policy', + position=1, + redirect_pool_id=self.pool_id, + redirect_url='/hr', + rules=[self.ref_l7rule]) + + self.ref_listener = data_models.Listener( + admin_state_up=False, + connection_limit=5, + default_pool=self.ref_pool, + default_pool_id=self.pool_id, + default_tls_container_data='default_cert_data', + default_tls_container_ref=self.default_tls_container_ref, + description='The listener', + insert_headers={'X-Forwarded-For': 'true'}, + l7policies=[self.ref_l7policy], + listener_id=self.listener_id, + loadbalancer_id=self.loadbalancer_id, + name='super_listener', + protocol='avian', + protocol_port=42, + sni_container_data=['sni_cert_data_1', 'sni_cert_data_2'], + sni_container_refs=[self.sni_container_ref_1, + self.sni_container_ref_2]) + + self.ref_lb = data_models.LoadBalancer( + admin_state_up=False, + description='One great load balancer', + flavor={'cake': 'chocolate'}, + listeners=[self.ref_listener], + loadbalancer_id=self.loadbalancer_id, + name='favorite_lb', + project_id=self.project_id, + vip_address=self.vip_address, + vip_network_id=self.vip_network_id, + vip_port_id=self.vip_port_id, + vip_subnet_id=self.vip_subnet_id) + + # start DB mocks + mock.patch('octavia.db.api.get_session').start() + mock.patch("octavia.api.drivers.utils.db_pool_to_provider_pool", + return_value=self.ref_pool).start() + + @decorator.decorator + def skip_no_octavia(f, *args, **kwargs): + global code_ok + if not code_ok: + obj = args[0] + return obj.skipTest('Octavia code not found') + return f(*args, **kwargs) + + @skip_no_octavia + def test_loadbalancer_create(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.loadbalancer_create(self.ref_lb) + cast_method.assert_called_with({}, 'loadbalancer_create', + loadbalancer=mock.ANY) + driver_obj = cast_method.call_args[1]['loadbalancer'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + self.assertIn('admin_state_up', driver_obj) + self.assertIn('name', driver_obj) + self.assertIn('listeners', driver_obj) + self.assertEqual(1, len(driver_obj['listeners'])) + self.assertEqual(self.ref_lb.vip_address, + driver_obj['vip_address']) + self.assertEqual(self.ref_lb.vip_network_id, + driver_obj['vip_network_id']) + self.assertEqual(self.ref_lb.vip_port_id, + driver_obj['vip_port_id']) + self.assertEqual(self.ref_lb.vip_subnet_id, + driver_obj['vip_subnet_id']) + + @skip_no_octavia + def test_loadbalancer_delete(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.loadbalancer_delete(self.ref_lb) + cast_method.assert_called_with({}, 'loadbalancer_delete', + cascade=False, + loadbalancer=mock.ANY) + driver_obj = cast_method.call_args[1]['loadbalancer'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + + @skip_no_octavia + def test_loadbalancer_update(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.loadbalancer_update(self.ref_lb, self.ref_lb) + cast_method.assert_called_with({}, 'loadbalancer_update', + old_loadbalancer=mock.ANY, + new_loadbalancer=mock.ANY) + driver_obj = cast_method.call_args[1]['new_loadbalancer'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + + @skip_no_octavia + def test_listener_create(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.listener_create(self.ref_listener) + cast_method.assert_called_with({}, 'listener_create', cert=None, + listener=mock.ANY) + driver_obj = cast_method.call_args[1]['listener'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + self.assertIn('admin_state_up', driver_obj) + self.assertIn('name', driver_obj) + self.assertIn('loadbalancer_id', driver_obj) + self.assertIn('loadbalancer', driver_obj) + self.assertEqual(self.ref_listener.protocol, + driver_obj['protocol']) + self.assertEqual(self.ref_listener.protocol_port, + driver_obj['protocol_port']) + self.assertEqual(self.ref_listener.connection_limit, + driver_obj['connection_limit']) + self.assertIn('l7policies', driver_obj) + #TODO(asarfaty) add after the driver is fixed + #self.assertIn('default_tls_container_id', driver_obj) + + @skip_no_octavia + def test_listener_delete(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.listener_delete(self.ref_listener) + cast_method.assert_called_with({}, 'listener_delete', + listener=mock.ANY) + driver_obj = cast_method.call_args[1]['listener'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + + @skip_no_octavia + def test_listener_update(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.listener_update(self.ref_listener, self.ref_listener) + cast_method.assert_called_with({}, 'listener_update', cert=None, + old_listener=mock.ANY, + new_listener=mock.ANY) + driver_obj = cast_method.call_args[1]['new_listener'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + + @skip_no_octavia + def test_pool_create(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.pool_create(self.ref_pool) + cast_method.assert_called_with({}, 'pool_create', pool=mock.ANY) + driver_obj = cast_method.call_args[1]['pool'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + self.assertIn('admin_state_up', driver_obj) + self.assertIn('name', driver_obj) + self.assertIn('loadbalancer_id', driver_obj) + self.assertIn('listener', driver_obj) + self.assertIn('listeners', driver_obj) + self.assertEqual(self.ref_pool.lb_algorithm, + driver_obj['lb_algorithm']) + self.assertEqual(self.ref_pool.session_persistence, + driver_obj['session_persistence']) + self.assertIn('members', driver_obj) + + @skip_no_octavia + def test_pool_delete(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.pool_delete(self.ref_pool) + cast_method.assert_called_with({}, 'pool_delete', pool=mock.ANY) + driver_obj = cast_method.call_args[1]['pool'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + + @skip_no_octavia + def test_pool_update(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.pool_update(self.ref_pool, self.ref_pool) + cast_method.assert_called_with({}, 'pool_update', + old_pool=mock.ANY, + new_pool=mock.ANY) + driver_obj = cast_method.call_args[1]['new_pool'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + + @skip_no_octavia + def test_member_create(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.member_create(self.ref_member) + cast_method.assert_called_with({}, 'member_create', + member=mock.ANY) + driver_obj = cast_method.call_args[1]['member'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + self.assertIn('admin_state_up', driver_obj) + self.assertIn('name', driver_obj) + self.assertEqual(self.pool_id, driver_obj['pool_id']) + self.assertIn('pool', driver_obj) + self.assertIn('loadbalancer', driver_obj['pool']) + #TODO(asarfaty) add when the driver is fixed + #self.assertIn('listener', driver_obj['pool']) + self.assertEqual(self.ref_member.subnet_id, + driver_obj['subnet_id']) + self.assertEqual(self.ref_member.address, + driver_obj['address']) + self.assertEqual(self.ref_member.protocol_port, + driver_obj['protocol_port']) + self.assertEqual(self.ref_member.weight, + driver_obj['weight']) + + @skip_no_octavia + def test_member_delete(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.member_delete(self.ref_member) + cast_method.assert_called_with({}, 'member_delete', + member=mock.ANY) + driver_obj = cast_method.call_args[1]['member'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + + @skip_no_octavia + def test_member_update(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.member_update(self.ref_member, self.ref_member) + cast_method.assert_called_with({}, 'member_update', + old_member=mock.ANY, + new_member=mock.ANY) + driver_obj = cast_method.call_args[1]['old_member'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + + @skip_no_octavia + def test_health_monitor_create(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.health_monitor_create(self.ref_healthmonitor) + cast_method.assert_called_with({}, 'healthmonitor_create', + healthmonitor=mock.ANY) + driver_obj = cast_method.call_args[1]['healthmonitor'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + self.assertIn('admin_state_up', driver_obj) + self.assertIn('name', driver_obj) + self.assertEqual(self.ref_healthmonitor.type, + driver_obj['type']) + self.assertEqual(self.ref_healthmonitor.url_path, + driver_obj['url_path']) + self.assertEqual(self.ref_healthmonitor.delay, + driver_obj['delay']) + self.assertEqual(self.ref_healthmonitor.timeout, + driver_obj['timeout']) + self.assertEqual(self.ref_healthmonitor.max_retries, + driver_obj['max_retries']) + self.assertEqual(self.ref_healthmonitor.http_method, + driver_obj['http_method']) + self.assertIn('pool', driver_obj) + self.assertEqual(self.pool_id, + driver_obj['pool']['id']) + self.assertEqual(self.loadbalancer_id, + driver_obj['pool']['loadbalancer_id']) + + @skip_no_octavia + def test_health_monitor_delete(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.health_monitor_delete(self.ref_healthmonitor) + cast_method.assert_called_with({}, 'healthmonitor_delete', + healthmonitor=mock.ANY) + driver_obj = cast_method.call_args[1]['healthmonitor'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + + @skip_no_octavia + def test_health_monitor_update(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.health_monitor_update(self.ref_healthmonitor, + self.ref_healthmonitor) + cast_method.assert_called_with({}, 'healthmonitor_update', + old_healthmonitor=mock.ANY, + new_healthmonitor=mock.ANY) + driver_obj = cast_method.call_args[1]['new_healthmonitor'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + + @skip_no_octavia + def test_l7policy_create(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.l7policy_create(self.ref_l7policy) + cast_method.assert_called_with({}, 'l7policy_create', + l7policy=mock.ANY) + driver_obj = cast_method.call_args[1]['l7policy'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + self.assertIn('admin_state_up', driver_obj) + self.assertIn('name', driver_obj) + self.assertIn('listener', driver_obj) + self.assertEqual(self.listener_id, driver_obj['listener_id']) + self.assertIn('rules', driver_obj) + self.assertIn('position', driver_obj) + self.assertEqual(self.ref_l7policy.action, driver_obj['action']) + self.assertEqual(self.ref_l7policy.redirect_url, + driver_obj['redirect_url']) + self.assertEqual(self.ref_l7policy.redirect_pool_id, + driver_obj['redirect_pool_id']) + + @skip_no_octavia + def test_l7policy_delete(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.l7policy_delete(self.ref_l7policy) + cast_method.assert_called_with({}, 'l7policy_delete', + l7policy=mock.ANY) + driver_obj = cast_method.call_args[1]['l7policy'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + + @skip_no_octavia + def test_l7policy_update(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.l7policy_update(self.ref_l7policy, self.ref_l7policy) + cast_method.assert_called_with({}, 'l7policy_update', + old_l7policy=mock.ANY, + new_l7policy=mock.ANY) + driver_obj = cast_method.call_args[1]['new_l7policy'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + + @skip_no_octavia + def test_l7rule_create(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.l7rule_create(self.ref_l7rule) + cast_method.assert_called_with({}, 'l7rule_create', + l7rule=mock.ANY) + driver_obj = cast_method.call_args[1]['l7rule'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + self.assertIn('admin_state_up', driver_obj) + self.assertIn('name', driver_obj) + self.assertIn('policy', driver_obj) + self.assertIn('rules', driver_obj['policy']) + self.assertEqual(self.ref_l7rule.type, driver_obj['type']) + self.assertEqual(self.ref_l7rule.value, driver_obj['value']) + self.assertEqual(self.ref_l7rule.invert, driver_obj['invert']) + + @skip_no_octavia + def test_l7rule_delete(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.l7rule_delete(self.ref_l7rule) + cast_method.assert_called_with({}, 'l7rule_delete', + l7rule=mock.ANY) + driver_obj = cast_method.call_args[1]['l7rule'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) + + @skip_no_octavia + def test_l7rule_update(self): + with mock.patch.object(self.driver.client, 'cast') as cast_method: + self.driver.l7rule_update(self.ref_l7rule, self.ref_l7rule) + cast_method.assert_called_with({}, 'l7rule_update', + old_l7rule=mock.ANY, + new_l7rule=mock.ANY) + driver_obj = cast_method.call_args[1]['new_l7rule'] + self.assertIn('id', driver_obj) + self.assertIn('project_id', driver_obj) diff --git a/vmware_nsx/tests/unit/services/lbaas/test_octavia_listener.py b/vmware_nsx/tests/unit/services/lbaas/test_octavia_listener.py new file mode 100644 index 0000000000..ac3f821b1e --- /dev/null +++ b/vmware_nsx/tests/unit/services/lbaas/test_octavia_listener.py @@ -0,0 +1,301 @@ +# Copyright 2018 VMware, Inc. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import testtools + +from oslo_utils import uuidutils + +from vmware_nsx.services.lbaas.octavia import octavia_listener + + +class DummyOctaviaResource(object): + create_called = False + update_called = False + delete_called = False + + def create(self, ctx, lb_obj, completor_func, **args): + self.create_called = True + completor_func(success=True) + + def update(self, ctx, old_lb_obj, new_lb_obj, completor_func, **args): + self.update_called = True + completor_func(success=True) + + def delete(self, ctx, lb_obj, completor_func, **args): + self.delete_called = True + completor_func(success=True) + + +class TestNsxOctaviaListener(testtools.TestCase): + """Test the NSX Octavia listener""" + def setUp(self): + super(TestNsxOctaviaListener, self).setUp() + self.dummyResource = DummyOctaviaResource() + self.clientMock = mock.Mock() + self.clientMock.cast = mock.Mock() + + self.endpoint = octavia_listener.NSXOctaviaListenerEndpoint( + client=self.clientMock, + loadbalancer=self.dummyResource, + listener=self.dummyResource, + pool=self.dummyResource, + member=self.dummyResource, + healthmonitor=self.dummyResource, + l7policy=self.dummyResource, + l7rule=self.dummyResource) + self.dummyObj = {'project_id': uuidutils.generate_uuid(), + 'id': uuidutils.generate_uuid()} + self.ctx = None + self.mock_ctx = mock.patch("neutron_lib.context.Context") + self.mock_ctx.start() + + def tearDown(self): + self.mock_ctx.stop() + super(TestNsxOctaviaListener, self).tearDown() + + def test_loadbalancer_create(self): + self.dummyResource.create_called = False + self.endpoint.loadbalancer_create(self.ctx, self.dummyObj) + self.assertTrue(self.dummyResource.create_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'loadbalancers': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'ACTIVE', + 'id': mock.ANY}]}) + + def test_loadbalancer_delete(self): + self.dummyResource.delete_called = False + self.endpoint.loadbalancer_delete(self.ctx, self.dummyObj) + self.assertTrue(self.dummyResource.delete_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'loadbalancers': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'DELETED', + 'id': mock.ANY}]}) + + def test_loadbalancer_update(self): + self.dummyResource.update_called = False + self.endpoint.loadbalancer_update(self.ctx, self.dummyObj, + self.dummyObj) + self.assertTrue(self.dummyResource.update_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'loadbalancers': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'ACTIVE', + 'id': mock.ANY}]}) + + def test_listener_create(self): + self.dummyResource.create_called = False + self.endpoint.listener_create(self.ctx, self.dummyObj, None) + self.assertTrue(self.dummyResource.create_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'listeners': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'ACTIVE', + 'id': mock.ANY}]}) + + def test_listener_delete(self): + self.dummyResource.delete_called = False + self.endpoint.listener_delete(self.ctx, self.dummyObj) + self.assertTrue(self.dummyResource.delete_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'listeners': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'DELETED', + 'id': mock.ANY}]}) + + def test_listener_update(self): + self.dummyResource.update_called = False + self.endpoint.listener_update(self.ctx, self.dummyObj, self.dummyObj, + None) + self.assertTrue(self.dummyResource.update_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'listeners': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'ACTIVE', + 'id': mock.ANY}]}) + + def test_pool_create(self): + self.dummyResource.create_called = False + self.endpoint.pool_create(self.ctx, self.dummyObj) + self.assertTrue(self.dummyResource.create_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'pools': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'ACTIVE', + 'id': mock.ANY}]}) + + def test_pool_delete(self): + self.dummyResource.delete_called = False + self.endpoint.pool_delete(self.ctx, self.dummyObj) + self.assertTrue(self.dummyResource.delete_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'pools': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'DELETED', + 'id': mock.ANY}]}) + + def test_pool_update(self): + self.dummyResource.update_called = False + self.endpoint.pool_update(self.ctx, self.dummyObj, self.dummyObj) + self.assertTrue(self.dummyResource.update_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'pools': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'ACTIVE', + 'id': mock.ANY}]}) + + def test_member_create(self): + self.dummyResource.create_called = False + self.endpoint.member_create(self.ctx, self.dummyObj) + self.assertTrue(self.dummyResource.create_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'members': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'ACTIVE', + 'id': mock.ANY}]}) + + def test_member_delete(self): + self.dummyResource.delete_called = False + self.endpoint.member_delete(self.ctx, self.dummyObj) + self.assertTrue(self.dummyResource.delete_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'members': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'DELETED', + 'id': mock.ANY}]}) + + def test_member_update(self): + self.dummyResource.update_called = False + self.endpoint.member_update(self.ctx, self.dummyObj, self.dummyObj) + self.assertTrue(self.dummyResource.update_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'members': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'ACTIVE', + 'id': mock.ANY}]}) + + def test_healthmonitor_create(self): + self.dummyResource.create_called = False + self.endpoint.healthmonitor_create(self.ctx, self.dummyObj) + self.assertTrue(self.dummyResource.create_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'healthmonitors': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'ACTIVE', + 'id': mock.ANY}]}) + + def test_healthmonitor_delete(self): + self.dummyResource.delete_called = False + self.endpoint.healthmonitor_delete(self.ctx, self.dummyObj) + self.assertTrue(self.dummyResource.delete_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'healthmonitors': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'DELETED', + 'id': mock.ANY}]}) + + def test_healthmonitor_update(self): + self.dummyResource.update_called = False + self.endpoint.healthmonitor_update(self.ctx, self.dummyObj, + self.dummyObj) + self.assertTrue(self.dummyResource.update_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'healthmonitors': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'ACTIVE', + 'id': mock.ANY}]}) + + def test_l7policy_create(self): + self.dummyResource.create_called = False + self.endpoint.l7policy_create(self.ctx, self.dummyObj) + self.assertTrue(self.dummyResource.create_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'l7policies': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'ACTIVE', + 'id': mock.ANY}]}) + + def test_l7policy_delete(self): + self.dummyResource.delete_called = False + self.endpoint.l7policy_delete(self.ctx, self.dummyObj) + self.assertTrue(self.dummyResource.delete_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'l7policies': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'DELETED', + 'id': mock.ANY}]}) + + def test_l7policy_update(self): + self.dummyResource.update_called = False + self.endpoint.l7policy_update(self.ctx, self.dummyObj, self.dummyObj) + self.assertTrue(self.dummyResource.update_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'l7policies': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'ACTIVE', + 'id': mock.ANY}]}) + + def test_l7rule_create(self): + self.dummyResource.create_called = False + self.endpoint.l7rule_create(self.ctx, self.dummyObj) + self.assertTrue(self.dummyResource.create_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'l7rules': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'ACTIVE', + 'id': mock.ANY}]}) + + def test_l7rule_delete(self): + self.dummyResource.delete_called = False + self.endpoint.l7rule_delete(self.ctx, self.dummyObj) + self.assertTrue(self.dummyResource.delete_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'l7rules': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'DELETED', + 'id': mock.ANY}]}) + + def test_l7rule_update(self): + self.dummyResource.update_called = False + self.endpoint.l7rule_update(self.ctx, self.dummyObj, self.dummyObj) + self.assertTrue(self.dummyResource.update_called) + self.clientMock.cast.assert_called_once_with( + {}, 'update_loadbalancer_status', + status={'l7rules': [ + {'operating_status': 'ONLINE', + 'provisioning_status': 'ACTIVE', + 'id': mock.ANY}]}) diff --git a/vmware_nsx/tests/unit/services/qos/test_nsxv_notification.py b/vmware_nsx/tests/unit/services/qos/test_nsxv_notification.py index e9c854c77b..0579bb6090 100644 --- a/vmware_nsx/tests/unit/services/qos/test_nsxv_notification.py +++ b/vmware_nsx/tests/unit/services/qos/test_nsxv_notification.py @@ -44,8 +44,15 @@ class TestQosNsxVNotification(test_plugin.NsxVPluginV2TestCase, self._init_dvs_config() # Reset the drive to re-create it qos_driver.DRIVER = None - super(TestQosNsxVNotification, self).setUp(plugin=CORE_PLUGIN, - ext_mgr=None) + # Skip Octavia init because of RPC conflicts + with mock.patch("vmware_nsx.services.lbaas.octavia.octavia_listener." + "NSXOctaviaListener.__init__", return_value=None),\ + mock.patch("vmware_nsx.services.lbaas.octavia.octavia_listener." + "NSXOctaviaStatisticsCollector.__init__", + return_value=None): + super(TestQosNsxVNotification, self).setUp(plugin=CORE_PLUGIN, + ext_mgr=None, + with_md_proxy=False) self.setup_coreplugin(CORE_PLUGIN) plugin_instance = directory.get_plugin()