From e94ff2681f18b37b6c288393f85011de0af2175d Mon Sep 17 00:00:00 2001 From: Aishwarya Thangappa Date: Thu, 14 Apr 2016 19:16:02 -0700 Subject: [PATCH] Adds a new feature to limit the amphora build rate This patch limits the number of Amphora build requests handled by the controller worker at a given time. Also, the amphora build requests are assigned priorities based on whether it is a normal loadbalancer create, failover or spares pool loadbalancer create request. Based on the priority and the order in which the requests were made if there is an available build slot the amphora will be built. Co-Authored-By: Lubosz "diltram" Kosnik Change-Id: I967cf0668f82fb3a63e18dc7a457c58b526b7e66 Closes-Bug: #1571802 --- etc/octavia.conf | 3 + octavia/common/config.py | 12 ++ octavia/common/constants.py | 5 + octavia/common/exceptions.py | 6 + .../controller/worker/amphora_rate_limit.py | 96 +++++++++++++ .../controller/worker/controller_worker.py | 16 ++- .../controller/worker/flows/amphora_flows.py | 21 +-- .../controller/worker/tasks/compute_tasks.py | 22 ++- ..._create_amphora_build_rate_limit_tables.py | 64 +++++++++ octavia/db/models.py | 18 +++ octavia/db/repositories.py | 69 ++++++++++ .../tests/functional/db/test_repositories.py | 2 +- .../worker/flows/test_amphora_flows.py | 26 ++-- .../worker/flows/test_load_balancer_flows.py | 4 +- .../worker/tasks/test_compute_tasks.py | 92 +++++++++---- .../worker/test_amphora_rate_limit.py | 129 ++++++++++++++++++ .../worker/test_controller_worker.py | 22 ++- 17 files changed, 539 insertions(+), 68 deletions(-) create mode 100644 octavia/controller/worker/amphora_rate_limit.py create mode 100644 octavia/db/migration/alembic_migrations/versions/fc5582da7d8a_create_amphora_build_rate_limit_tables.py create mode 100644 octavia/tests/unit/controller/worker/test_amphora_rate_limit.py diff --git a/etc/octavia.conf b/etc/octavia.conf index 0492907dda..02053f2096 100644 --- a/etc/octavia.conf +++ b/etc/octavia.conf @@ -116,6 +116,9 @@ # haproxy_template = # connection_max_retries = 300 # connection_retry_interval = 5 +# build_rate_limit = -1 +# build_active_retries = 300 +# build_retry_interval = 5 # user_group = nogroup # Maximum number of entries that can fit in the stick table. diff --git a/octavia/common/config.py b/octavia/common/config.py index d108f48f5f..a42961b108 100644 --- a/octavia/common/config.py +++ b/octavia/common/config.py @@ -160,6 +160,18 @@ haproxy_amphora_opts = [ default=5, help=_('Retry timeout between connection attempts in ' 'seconds.')), + cfg.IntOpt('build_rate_limit', + default=-1, + help=_('Number of amphorae that could be built per controller' + 'worker, simultaneously.')), + cfg.IntOpt('build_active_retries', + default=300, + help=_('Retry threshold for waiting for a build slot for ' + 'an amphorae.')), + cfg.IntOpt('build_retry_interval', + default=5, + help=_('Retry timeout between build attempts in ' + 'seconds.')), cfg.StrOpt('user_group', default='nogroup', help=_('The user group for haproxy to run under inside the ' diff --git a/octavia/common/constants.py b/octavia/common/constants.py index 34b47ffc1e..73a07ec02a 100644 --- a/octavia/common/constants.py +++ b/octavia/common/constants.py @@ -258,6 +258,11 @@ SERVICE_AUTH = 'service_auth' RPC_NAMESPACE_CONTROLLER_AGENT = 'controller' +# Build Type Priority +LB_CREATE_FAILOVER_PRIORITY = 20 +LB_CREATE_NORMAL_PRIORITY = 40 +LB_CREATE_SPARES_POOL_PRIORITY = 60 +BUILD_TYPE_PRIORITY = 'build_type_priority' # Active standalone roles and topology TOPOLOGY_SINGLE = 'SINGLE' diff --git a/octavia/common/exceptions.py b/octavia/common/exceptions.py index 6014f6bf03..735ed0013b 100644 --- a/octavia/common/exceptions.py +++ b/octavia/common/exceptions.py @@ -20,6 +20,8 @@ Octavia base exception handling. from oslo_utils import excutils from webob import exc +from octavia.i18n import _LE + class OctaviaException(Exception): """Base Octavia Exception. @@ -157,6 +159,10 @@ class ComputeBuildException(OctaviaException): message = _('Failed to build compute instance.') +class ComputeBuildQueueTimeoutException(OctaviaException): + message = _LE('Failed to get an amphora build slot.') + + class ComputeDeleteException(OctaviaException): message = _('Failed to delete compute instance.') diff --git a/octavia/controller/worker/amphora_rate_limit.py b/octavia/controller/worker/amphora_rate_limit.py new file mode 100644 index 0000000000..b7b9b2a7d1 --- /dev/null +++ b/octavia/controller/worker/amphora_rate_limit.py @@ -0,0 +1,96 @@ +# Copyright 2016 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import time + +from oslo_config import cfg +from oslo_log import log as logging + +from octavia.common import exceptions +from octavia.db import api as db_apis +from octavia.db import repositories as repo + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF +CONF.import_group('haproxy_amphora', 'octavia.common.config') + + +class AmphoraBuildRateLimit(object): + + def __init__(self): + self.amp_build_slots_repo = repo.AmphoraBuildSlotsRepository() + self.amp_build_req_repo = repo.AmphoraBuildReqRepository() + + def add_to_build_request_queue(self, amphora_id, build_priority): + self.amp_build_req_repo.add_to_build_queue( + db_apis.get_session(), + amphora_id=amphora_id, + priority=build_priority) + LOG.debug("Added build request for %s to the queue", amphora_id) + self.wait_for_build_slot(amphora_id) + + def has_build_slot(self): + build_rate_limit = CONF.haproxy_amphora.build_rate_limit + session = db_apis.get_session() + with session.begin(subtransactions=True): + used_build_slots = (self.amp_build_slots_repo + .get_used_build_slots_count(session)) + available_build_slots = build_rate_limit - used_build_slots + LOG.debug("Available build slots %d", available_build_slots) + return available_build_slots > 0 + + def has_highest_priority(self, amphora_id): + session = db_apis.get_session() + with session.begin(subtransactions=True): + highest_priority_build_req = ( + self.amp_build_req_repo.get_highest_priority_build_req( + session)) + LOG.debug("Highest priority req: %s, Current req: %s", + highest_priority_build_req, amphora_id) + return amphora_id == highest_priority_build_req + + def update_build_status_and_available_build_slots(self, amphora_id): + session = db_apis.get_session() + with session.begin(subtransactions=True): + self.amp_build_slots_repo.update_count(session, action='increment') + self.amp_build_req_repo.update_req_status(session, amphora_id) + + def remove_from_build_req_queue(self, amphora_id): + session = db_apis.get_session() + with session.begin(subtransactions=True): + self.amp_build_req_repo.delete(session, amphora_id=amphora_id) + self.amp_build_slots_repo.update_count(session, action='decrement') + LOG.debug("Removed request for %s from queue" + " and released the build slot", amphora_id) + + def remove_all_from_build_req_queue(self): + session = db_apis.get_session() + with session.begin(subtransactions=True): + self.amp_build_req_repo.delete_all(session) + self.amp_build_slots_repo.update_count(session, action='reset') + LOG.debug("Removed all the build requests and " + "released the build slots") + + def wait_for_build_slot(self, amphora_id): + LOG.debug("Waiting for a build slot") + for i in range(CONF.haproxy_amphora.build_active_retries): + if (self.has_build_slot() and + self.has_highest_priority(amphora_id)): + self.update_build_status_and_available_build_slots(amphora_id) + return + time.sleep(CONF.haproxy_amphora.build_retry_interval) + self.remove_all_from_build_req_queue() + raise exceptions.ComputeBuildQueueTimeoutException() diff --git a/octavia/controller/worker/controller_worker.py b/octavia/controller/worker/controller_worker.py index 82316dd322..9b4d632f93 100644 --- a/octavia/controller/worker/controller_worker.py +++ b/octavia/controller/worker/controller_worker.py @@ -80,8 +80,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): :returns: amphora_id """ - create_amp_tf = self._taskflow_load(self._amphora_flows. - get_create_amphora_flow()) + create_amp_tf = self._taskflow_load( + self._amphora_flows.get_create_amphora_flow(), + store={constants.BUILD_TYPE_PRIORITY: + constants.LB_CREATE_SPARES_POOL_PRIORITY} + ) with tf_logging.DynamicLoggingListener( create_amp_tf, log=LOG, hide_inputs_outputs_of=self._exclude_result_logging_tasks): @@ -261,7 +264,9 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): :raises NoSuitableAmphoraException: Unable to allocate an Amphora. """ - store = {constants.LOADBALANCER_ID: load_balancer_id} + store = {constants.LOADBALANCER_ID: load_balancer_id, + constants.BUILD_TYPE_PRIORITY: + constants.LB_CREATE_NORMAL_PRIORITY} topology = CONF.controller_worker.loadbalancer_topology @@ -622,7 +627,10 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): self._amphora_flows.get_failover_flow(role=amp.role, status=amp.status), store={constants.FAILED_AMPHORA: amp, - constants.LOADBALANCER_ID: amp.load_balancer_id}) + constants.LOADBALANCER_ID: amp.load_balancer_id, + constants.BUILD_TYPE_PRIORITY: + constants.LB_CREATE_FAILOVER_PRIORITY + }) with tf_logging.DynamicLoggingListener( failover_amphora_tf, log=LOG, hide_inputs_outputs_of=self._exclude_result_logging_tasks): diff --git a/octavia/controller/worker/flows/amphora_flows.py b/octavia/controller/worker/flows/amphora_flows.py index e4112346f7..0cbeabb950 100644 --- a/octavia/controller/worker/flows/amphora_flows.py +++ b/octavia/controller/worker/flows/amphora_flows.py @@ -53,16 +53,17 @@ class AmphoraFlows(object): requires=(constants.AMPHORA_ID, constants.SERVER_PEM))) create_amphora_flow.add(compute_tasks.CertComputeCreate( - requires=(constants.AMPHORA_ID, constants.SERVER_PEM), + requires=(constants.AMPHORA_ID, constants.SERVER_PEM, + constants.BUILD_TYPE_PRIORITY), provides=constants.COMPUTE_ID)) else: create_amphora_flow.add(compute_tasks.ComputeCreate( - requires=constants.AMPHORA_ID, + requires=(constants.AMPHORA_ID, constants.BUILD_TYPE_PRIORITY), provides=constants.COMPUTE_ID)) create_amphora_flow.add(database_tasks.MarkAmphoraBootingInDB( requires=(constants.AMPHORA_ID, constants.COMPUTE_ID))) create_amphora_flow.add(compute_tasks.ComputeWait( - requires=constants.COMPUTE_ID, + requires=(constants.COMPUTE_ID, constants.AMPHORA_ID), provides=constants.COMPUTE_OBJ)) create_amphora_flow.add(database_tasks.UpdateAmphoraInfo( requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ), @@ -130,12 +131,14 @@ class AmphoraFlows(object): create_amp_for_lb_subflow.add(compute_tasks.CertComputeCreate( name=sf_name + '-' + constants.CERT_COMPUTE_CREATE, requires=(constants.AMPHORA_ID, constants.SERVER_PEM, - constants.SERVER_GROUP_ID), + constants.SERVER_GROUP_ID, + constants.BUILD_TYPE_PRIORITY), provides=constants.COMPUTE_ID)) else: create_amp_for_lb_subflow.add(compute_tasks.CertComputeCreate( name=sf_name + '-' + constants.CERT_COMPUTE_CREATE, - requires=(constants.AMPHORA_ID, constants.SERVER_PEM), + requires=(constants.AMPHORA_ID, constants.SERVER_PEM, + constants.BUILD_TYPE_PRIORITY), provides=constants.COMPUTE_ID)) else: @@ -143,12 +146,14 @@ class AmphoraFlows(object): ) and anti_affinity: create_amp_for_lb_subflow.add(compute_tasks.ComputeCreate( name=sf_name + '-' + constants.COMPUTE_CREATE, - requires=(constants.AMPHORA_ID, constants.SERVER_GROUP_ID), + requires=(constants.AMPHORA_ID, constants.SERVER_GROUP_ID, + constants.BUILD_TYPE_PRIORITY), provides=constants.COMPUTE_ID)) else: create_amp_for_lb_subflow.add(compute_tasks.ComputeCreate( name=sf_name + '-' + constants.COMPUTE_CREATE, - requires=constants.AMPHORA_ID, + requires=(constants.AMPHORA_ID, + constants.BUILD_TYPE_PRIORITY), provides=constants.COMPUTE_ID)) create_amp_for_lb_subflow.add(database_tasks.UpdateAmphoraComputeId( @@ -159,7 +164,7 @@ class AmphoraFlows(object): requires=(constants.AMPHORA_ID, constants.COMPUTE_ID))) create_amp_for_lb_subflow.add(compute_tasks.ComputeWait( name=sf_name + '-' + constants.COMPUTE_WAIT, - requires=constants.COMPUTE_ID, + requires=(constants.COMPUTE_ID, constants.AMPHORA_ID), provides=constants.COMPUTE_OBJ)) create_amp_for_lb_subflow.add(database_tasks.UpdateAmphoraInfo( name=sf_name + '-' + constants.UPDATE_AMPHORA_INFO, diff --git a/octavia/controller/worker/tasks/compute_tasks.py b/octavia/controller/worker/tasks/compute_tasks.py index 4d75177d84..167c2b7f03 100644 --- a/octavia/controller/worker/tasks/compute_tasks.py +++ b/octavia/controller/worker/tasks/compute_tasks.py @@ -26,6 +26,7 @@ from octavia.amphorae.backends.agent import agent_jinja_cfg from octavia.common import constants from octavia.common import exceptions from octavia.common.jinja import user_data_jinja_cfg +from octavia.controller.worker import amphora_rate_limit from octavia.i18n import _LE, _LW CONF = cfg.CONF @@ -42,12 +43,15 @@ class BaseComputeTask(task.Task): name=CONF.controller_worker.compute_driver, invoke_on_load=True ).driver + self.rate_limit = amphora_rate_limit.AmphoraBuildRateLimit() class ComputeCreate(BaseComputeTask): """Create the compute instance for a new amphora.""" - def execute(self, amphora_id, ports=None, config_drive_files=None, + def execute(self, amphora_id, + build_type_priority=constants.LB_CREATE_NORMAL_PRIORITY, + ports=None, config_drive_files=None, server_group_id=None): """Create an amphora @@ -65,6 +69,10 @@ class ComputeCreate(BaseComputeTask): key_name = None if not ssh_access else ssh_key try: + if CONF.haproxy_amphora.build_rate_limit != -1: + self.rate_limit.add_to_build_request_queue( + amphora_id, build_type_priority) + agent_cfg = agent_jinja_cfg.AgentJinjaTemplater() config_drive_files['/etc/octavia/amphora-agent.conf'] = ( agent_cfg.build_agent_config(amphora_id)) @@ -115,8 +123,9 @@ class ComputeCreate(BaseComputeTask): class CertComputeCreate(ComputeCreate): - def execute(self, amphora_id, server_pem, ports=None, - server_group_id=None): + def execute(self, amphora_id, server_pem, + build_type_priority=constants.LB_CREATE_NORMAL_PRIORITY, + ports=None, server_group_id=None): """Create an amphora :returns: an amphora @@ -129,7 +138,8 @@ class CertComputeCreate(ComputeCreate): '/etc/octavia/certs/server.pem': server_pem, '/etc/octavia/certs/client_ca.pem': ca} return super(CertComputeCreate, self).execute( - amphora_id, ports=ports, config_drive_files=config_drive_files, + amphora_id, build_type_priority=build_type_priority, + ports=ports, config_drive_files=config_drive_files, server_group_id=server_group_id) @@ -167,7 +177,7 @@ class ComputeDelete(BaseComputeTask): class ComputeWait(BaseComputeTask): """Wait for the compute driver to mark the amphora active.""" - def execute(self, compute_id): + def execute(self, compute_id, amphora_id): """Wait for the compute driver to mark the amphora active :raises: Generic exception if the amphora is not active @@ -176,6 +186,8 @@ class ComputeWait(BaseComputeTask): for i in range(CONF.controller_worker.amp_active_retries): amp = self.compute.get_amphora(compute_id) if amp.status == constants.ACTIVE: + if CONF.haproxy_amphora.build_rate_limit != -1: + self.rate_limit.remove_from_build_req_queue(amphora_id) return amp elif amp.status == constants.ERROR: raise exceptions.ComputeBuildException() diff --git a/octavia/db/migration/alembic_migrations/versions/fc5582da7d8a_create_amphora_build_rate_limit_tables.py b/octavia/db/migration/alembic_migrations/versions/fc5582da7d8a_create_amphora_build_rate_limit_tables.py new file mode 100644 index 0000000000..690820ce9a --- /dev/null +++ b/octavia/db/migration/alembic_migrations/versions/fc5582da7d8a_create_amphora_build_rate_limit_tables.py @@ -0,0 +1,64 @@ +# Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""create_amphora_build_rate_limit_tables + +Revision ID: fc5582da7d8a +Revises: 443fe6676637 +Create Date: 2016-04-07 19:42:28.171902 + +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy import sql + +# revision identifiers, used by Alembic. +revision = 'fc5582da7d8a' +down_revision = '443fe6676637' + + +def upgrade(): + op.create_table( + u'amphora_build_slots', + sa.Column(u'id', sa.Integer(), primary_key=True), + sa.Column(u'slots_used', sa.Integer(), default=0) + ) + + # Create temporary table for table data seeding + insert_table = sql.table( + u'amphora_build_slots', + sql.column(u'id', sa.Integer), + sql.column(u'slots_used', sa.Integer) + ) + + op.bulk_insert( + insert_table, + [ + {'id': 1, 'slots_used': 0} + ] + ) + + op.create_table( + u'amphora_build_request', + sa.Column(u'amphora_id', sa.String(36), nullable=True, + primary_key=True), + sa.Column(u'priority', sa.Integer()), + sa.Column(u'created_time', sa.DateTime(timezone=True), nullable=False), + sa.Column(u'status', sa.String(16), default='WAITING', nullable=False) + ) + + +def downgrade(): + pass diff --git a/octavia/db/models.py b/octavia/db/models.py index 277ca4bd8a..d6884d2bb2 100644 --- a/octavia/db/models.py +++ b/octavia/db/models.py @@ -84,6 +84,24 @@ class L7PolicyAction(base_models.BASE, base_models.LookupTableMixin): __tablename__ = "l7policy_action" +class AmphoraBuildSlots(base_models.BASE): + + __tablename__ = "amphora_build_slots" + + id = sa.Column(sa.Integer(), primary_key=True) + slots_used = sa.Column(sa.Integer()) + + +class AmphoraBuildRequest(base_models.BASE): + + __tablename__ = "amphora_build_request" + + amphora_id = sa.Column(sa.String(36), nullable=True, primary_key=True) + priority = sa.Column(sa.Integer()) + created_time = sa.Column(sa.DateTime, default=func.now(), nullable=False) + status = sa.Column(sa.String(16), default='WAITING', nullable=False) + + class SessionPersistence(base_models.BASE): __data_model__ = data_models.SessionPersistence diff --git a/octavia/db/repositories.py b/octavia/db/repositories.py index 9bb1694d79..e5b0f7940d 100644 --- a/octavia/db/repositories.py +++ b/octavia/db/repositories.py @@ -142,6 +142,8 @@ class Repositories(object): self.vrrpgroup = VRRPGroupRepository() self.l7rule = L7RuleRepository() self.l7policy = L7PolicyRepository() + self.amp_build_slots = AmphoraBuildSlotsRepository() + self.amp_build_req = AmphoraBuildReqRepository() self.quotas = QuotasRepository() def create_load_balancer_and_vip(self, session, lb_dict, vip_dict): @@ -900,6 +902,73 @@ class AmphoraRepository(BaseRepository): return amp.to_data_model() +class AmphoraBuildReqRepository(BaseRepository): + model_class = models.AmphoraBuildRequest + + def add_to_build_queue(self, session, amphora_id=None, priority=None): + """Adds the build request to the table.""" + with session.begin(subtransactions=True): + model = self.model_class(amphora_id=amphora_id, priority=priority) + session.add(model) + + def update_req_status(self, session, amphora_id=None): + """Updates the request status.""" + with session.begin(subtransactions=True): + (session.query(self.model_class) + .filter_by(amphora_id=amphora_id) + .update({self.model_class.status: 'BUILDING'})) + + def get_highest_priority_build_req(self, session): + """Fetches build request with highest priority and least created_time. + + priority 20 = failover (highest) + priority 40 = create_loadbalancer + priority 60 = sparespool (least) + :param session: A Sql Alchemy database session. + :returns amphora_id corresponding to highest priority and least created + time in 'WAITING' status. + """ + with session.begin(subtransactions=True): + return (session.query(self.model_class.amphora_id) + .order_by(self.model_class.status.desc()) + .order_by(self.model_class.priority.asc()) + .order_by(self.model_class.created_time.asc()) + .first())[0] + + def delete_all(self, session): + "Deletes all the build requests." + with session.begin(subtransactions=True): + session.query(self.model_class).delete() + + +class AmphoraBuildSlotsRepository(BaseRepository): + model_class = models.AmphoraBuildSlots + + def get_used_build_slots_count(self, session): + """Gets the number of build slots in use. + + :returns: Number of current build slots. + """ + with session.begin(subtransactions=True): + count = session.query(self.model_class.slots_used).one() + return count[0] + + def update_count(self, session, action='increment'): + """Increments/Decrements/Resets the number of build_slots used.""" + with session.begin(subtransactions=True): + if action == 'increment': + session.query(self.model_class).filter_by(id=1).update( + {self.model_class.slots_used: + self.get_used_build_slots_count(session) + 1}) + elif action == 'decrement': + session.query(self.model_class).filter_by(id=1).update( + {self.model_class.slots_used: + self.get_used_build_slots_count(session) - 1}) + elif action == 'reset': + session.query(self.model_class).filter_by(id=1).update( + {self.model_class.slots_used: 0}) + + class SNIRepository(BaseRepository): model_class = models.SNI diff --git a/octavia/tests/functional/db/test_repositories.py b/octavia/tests/functional/db/test_repositories.py index d50a3ed3b0..5857e33be6 100644 --- a/octavia/tests/functional/db/test_repositories.py +++ b/octavia/tests/functional/db/test_repositories.py @@ -99,7 +99,7 @@ class AllRepositoriesTest(base.OctaviaDBTestBase): 'session_persistence', 'pool', 'member', 'listener', 'listener_stats', 'amphora', 'sni', 'amphorahealth', 'vrrpgroup', 'l7rule', 'l7policy', - 'quotas') + 'amp_build_slots', 'amp_build_req', 'quotas') for repo_attr in repo_attr_names: single_repo = getattr(self.repos, repo_attr, None) message = ("Class Repositories should have %s instance" diff --git a/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py b/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py index 1a90e7d63a..11a31b730d 100644 --- a/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py +++ b/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py @@ -51,7 +51,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.SERVER_PEM, amp_flow.provides) self.assertEqual(5, len(amp_flow.provides)) - self.assertEqual(0, len(amp_flow.requires)) + self.assertEqual(1, len(amp_flow.requires)) def test_get_create_amphora_flow_cert(self, mock_get_net_driver): self.AmpFlow = amphora_flows.AmphoraFlows() @@ -65,7 +65,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.COMPUTE_ID, amp_flow.provides) self.assertEqual(5, len(amp_flow.provides)) - self.assertEqual(0, len(amp_flow.requires)) + self.assertEqual(1, len(amp_flow.requires)) def test_get_create_amphora_for_lb_flow(self, mock_get_net_driver): @@ -83,7 +83,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.SERVER_PEM, amp_flow.provides) self.assertEqual(5, len(amp_flow.provides)) - self.assertEqual(1, len(amp_flow.requires)) + self.assertEqual(2, len(amp_flow.requires)) def test_get_cert_create_amphora_for_lb_flow(self, mock_get_net_driver): @@ -103,7 +103,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.SERVER_PEM, amp_flow.provides) self.assertEqual(5, len(amp_flow.provides)) - self.assertEqual(1, len(amp_flow.requires)) + self.assertEqual(2, len(amp_flow.requires)) def test_get_cert_master_create_amphora_for_lb_flow( self, mock_get_net_driver): @@ -124,7 +124,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.SERVER_PEM, amp_flow.provides) self.assertEqual(5, len(amp_flow.provides)) - self.assertEqual(1, len(amp_flow.requires)) + self.assertEqual(2, len(amp_flow.requires)) def test_get_cert_master_rest_anti_affinity_create_amphora_for_lb_flow( self, mock_get_net_driver): @@ -143,7 +143,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.SERVER_PEM, amp_flow.provides) self.assertEqual(5, len(amp_flow.provides)) - self.assertEqual(2, len(amp_flow.requires)) + self.assertEqual(3, len(amp_flow.requires)) self.conf.config(group="nova", enable_anti_affinity=False) def test_get_cert_backup_create_amphora_for_lb_flow( @@ -164,7 +164,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.SERVER_PEM, amp_flow.provides) self.assertEqual(5, len(amp_flow.provides)) - self.assertEqual(1, len(amp_flow.requires)) + self.assertEqual(2, len(amp_flow.requires)) def test_get_cert_bogus_create_amphora_for_lb_flow( self, mock_get_net_driver): @@ -184,7 +184,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.SERVER_PEM, amp_flow.provides) self.assertEqual(5, len(amp_flow.provides)) - self.assertEqual(1, len(amp_flow.requires)) + self.assertEqual(2, len(amp_flow.requires)) def test_get_cert_backup_rest_anti_affinity_create_amphora_for_lb_flow( self, mock_get_net_driver): @@ -202,7 +202,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.SERVER_PEM, amp_flow.provides) self.assertEqual(5, len(amp_flow.provides)) - self.assertEqual(2, len(amp_flow.requires)) + self.assertEqual(3, len(amp_flow.requires)) self.conf.config(group="nova", enable_anti_affinity=False) def test_get_delete_amphora_flow(self, mock_get_net_driver): @@ -253,7 +253,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.LISTENERS, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides) - self.assertEqual(2, len(amp_flow.requires)) + self.assertEqual(3, len(amp_flow.requires)) self.assertEqual(11, len(amp_flow.provides)) amp_flow = self.AmpFlow.get_failover_flow( @@ -273,7 +273,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.LISTENERS, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides) - self.assertEqual(2, len(amp_flow.requires)) + self.assertEqual(3, len(amp_flow.requires)) self.assertEqual(11, len(amp_flow.provides)) amp_flow = self.AmpFlow.get_failover_flow( @@ -293,7 +293,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.LISTENERS, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides) - self.assertEqual(2, len(amp_flow.requires)) + self.assertEqual(3, len(amp_flow.requires)) self.assertEqual(11, len(amp_flow.provides)) amp_flow = self.AmpFlow.get_failover_flow( @@ -313,7 +313,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.LISTENERS, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides) - self.assertEqual(2, len(amp_flow.requires)) + self.assertEqual(3, len(amp_flow.requires)) self.assertEqual(11, len(amp_flow.provides)) def test_get_failover_flow_spare(self, mock_get_net_driver): diff --git a/octavia/tests/unit/controller/worker/flows/test_load_balancer_flows.py b/octavia/tests/unit/controller/worker/flows/test_load_balancer_flows.py index 16cd1ad915..bef7501b66 100644 --- a/octavia/tests/unit/controller/worker/flows/test_load_balancer_flows.py +++ b/octavia/tests/unit/controller/worker/flows/test_load_balancer_flows.py @@ -212,7 +212,7 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, create_flow.provides) - self.assertEqual(2, len(create_flow.requires)) + self.assertEqual(3, len(create_flow.requires)) self.assertEqual(12, len(create_flow.provides), create_flow.provides) @@ -240,6 +240,6 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, create_flow.provides) - self.assertEqual(2, len(create_flow.requires)) + self.assertEqual(3, len(create_flow.requires)) self.assertEqual(12, len(create_flow.provides), create_flow.provides) diff --git a/octavia/tests/unit/controller/worker/tasks/test_compute_tasks.py b/octavia/tests/unit/controller/worker/tasks/test_compute_tasks.py index 4581aad538..f9ac526eb4 100644 --- a/octavia/tests/unit/controller/worker/tasks/test_compute_tasks.py +++ b/octavia/tests/unit/controller/worker/tasks/test_compute_tasks.py @@ -59,17 +59,22 @@ _port.id = PORT_ID class TestComputeTasks(base.TestCase): def setUp(self): - conf = oslo_fixture.Config(cfg.CONF) - conf.config(group="controller_worker", amp_flavor_id=AMP_FLAVOR_ID) - conf.config(group="controller_worker", amp_image_id=AMP_IMAGE_ID) - conf.config(group="controller_worker", amp_image_tag=AMP_IMAGE_TAG) - conf.config(group="controller_worker", - amp_ssh_key_name=AMP_SSH_KEY_NAME) - conf.config(group="controller_worker", amp_boot_network_list=AMP_NET) - conf.config(group="controller_worker", amp_active_wait_sec=AMP_WAIT) - conf.config(group="controller_worker", - amp_secgroup_list=AMP_SEC_GROUPS) - conf.config(group="controller_worker", amp_image_owner_id='') + self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF)) + self.conf.config( + group="controller_worker", amp_flavor_id=AMP_FLAVOR_ID) + self.conf.config( + group="controller_worker", amp_image_id=AMP_IMAGE_ID) + self.conf.config( + group="controller_worker", amp_image_tag=AMP_IMAGE_TAG) + self.conf.config( + group="controller_worker", amp_ssh_key_name=AMP_SSH_KEY_NAME) + self.conf.config( + group="controller_worker", amp_boot_network_list=AMP_NET) + self.conf.config( + group="controller_worker", amp_active_wait_sec=AMP_WAIT) + self.conf.config( + group="controller_worker", amp_secgroup_list=AMP_SEC_GROUPS) + self.conf.config(group="controller_worker", amp_image_owner_id='') _amphora_mock.id = AMPHORA_ID _amphora_mock.status = constants.AMPHORA_ALLOCATED @@ -87,9 +92,8 @@ class TestComputeTasks(base.TestCase): def test_compute_create(self, mock_driver, mock_conf, mock_jinja): image_owner_id = uuidutils.generate_uuid() - conf = oslo_fixture.Config(cfg.CONF) - conf.config(group="controller_worker", - amp_image_owner_id=image_owner_id) + self.conf.config( + group="controller_worker", amp_image_owner_id=image_owner_id) createcompute = compute_tasks.ComputeCreate() @@ -114,6 +118,7 @@ class TestComputeTasks(base.TestCase): user_data=None, server_group_id=SERVER_GRPOUP_ID) + # Make sure it returns the expected compute_id self.assertEqual(COMPUTE_ID, compute_id) # Test that a build exception is raised @@ -138,9 +143,6 @@ class TestComputeTasks(base.TestCase): createcompute.revert(COMPUTE_ID, _amphora_mock.id) - conf.config(group="controller_worker", - amp_image_owner_id='') - @mock.patch('jinja2.Environment.get_template') @mock.patch('octavia.amphorae.backends.agent.' 'agent_jinja_cfg.AgentJinjaTemplater.' @@ -152,8 +154,8 @@ class TestComputeTasks(base.TestCase): def test_compute_create_user_data(self, mock_driver, mock_ud_conf, mock_conf, mock_jinja): - conf = oslo_fixture.Config(cfg.CONF) - conf.config(group="controller_worker", user_data_config_drive=True) + self.conf.config( + group="controller_worker", user_data_config_drive=True) mock_ud_conf.return_value = 'test_ud_conf' createcompute = compute_tasks.ComputeCreate() @@ -176,6 +178,7 @@ class TestComputeTasks(base.TestCase): user_data='test_ud_conf', server_group_id=None) + # Make sure it returns the expected compute_id self.assertEqual(COMPUTE_ID, compute_id) # Test that a build exception is raised @@ -199,8 +202,6 @@ class TestComputeTasks(base.TestCase): # Test that a delete exception is not raised createcompute.revert(COMPUTE_ID, _amphora_mock.id) - conf = oslo_fixture.Config(cfg.CONF) - conf.config(group="controller_worker", user_data_config_drive=False) @mock.patch('jinja2.Environment.get_template') @mock.patch('octavia.amphorae.backends.agent.' @@ -213,8 +214,10 @@ class TestComputeTasks(base.TestCase): createcompute = compute_tasks.ComputeCreate() mock_driver.build.return_value = COMPUTE_ID - conf = oslo_fixture.Config(cfg.CONF) - conf.config(group="controller_worker", amp_ssh_access_allowed=False) + self.conf.config( + group="controller_worker", amp_ssh_access_allowed=False) + self.conf.config( + group="controller_worker", user_data_config_drive=False) # Test execute() compute_id = createcompute.execute(_amphora_mock.id, ports=[_port], @@ -320,12 +323,16 @@ class TestComputeTasks(base.TestCase): createcompute.revert(COMPUTE_ID, _amphora_mock.id) + @mock.patch('octavia.controller.worker.amphora_rate_limit' + '.AmphoraBuildRateLimit.remove_from_build_req_queue') @mock.patch('stevedore.driver.DriverManager.driver') @mock.patch('time.sleep') def test_compute_wait(self, mock_time_sleep, - mock_driver): + mock_driver, + mock_remove_from_build_queue): + self.conf.config(group='haproxy_amphora', build_rate_limit=5) _amphora_mock.compute_id = COMPUTE_ID _amphora_mock.status = constants.ACTIVE _amphora_mock.lb_network_ip = LB_NET_IP @@ -333,7 +340,7 @@ class TestComputeTasks(base.TestCase): mock_driver.get_amphora.return_value = _amphora_mock computewait = compute_tasks.ComputeWait() - computewait.execute(COMPUTE_ID) + computewait.execute(COMPUTE_ID, AMPHORA_ID) mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID) @@ -341,12 +348,18 @@ class TestComputeTasks(base.TestCase): self.assertRaises(exceptions.ComputeWaitTimeoutException, computewait.execute, - _amphora_mock) + _amphora_mock, AMPHORA_ID) + @mock.patch('octavia.controller.worker.amphora_rate_limit' + '.AmphoraBuildRateLimit.remove_from_build_req_queue') @mock.patch('stevedore.driver.DriverManager.driver') @mock.patch('time.sleep') - def test_compute_wait_error_status(self, mock_time_sleep, mock_driver): + def test_compute_wait_error_status(self, + mock_time_sleep, + mock_driver, + mock_remove_from_build_queue): + self.conf.config(group='haproxy_amphora', build_rate_limit=5) _amphora_mock.compute_id = COMPUTE_ID _amphora_mock.status = constants.ACTIVE _amphora_mock.lb_network_ip = LB_NET_IP @@ -354,7 +367,7 @@ class TestComputeTasks(base.TestCase): mock_driver.get_amphora.return_value = _amphora_mock computewait = compute_tasks.ComputeWait() - computewait.execute(COMPUTE_ID) + computewait.execute(COMPUTE_ID, AMPHORA_ID) mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID) @@ -362,7 +375,27 @@ class TestComputeTasks(base.TestCase): self.assertRaises(exceptions.ComputeBuildException, computewait.execute, - _amphora_mock) + _amphora_mock, AMPHORA_ID) + + @mock.patch('octavia.controller.worker.amphora_rate_limit' + '.AmphoraBuildRateLimit.remove_from_build_req_queue') + @mock.patch('stevedore.driver.DriverManager.driver') + @mock.patch('time.sleep') + def test_compute_wait_skipped(self, + mock_time_sleep, + mock_driver, + mock_remove_from_build_queue): + _amphora_mock.compute_id = COMPUTE_ID + _amphora_mock.status = constants.ACTIVE + _amphora_mock.lb_network_ip = LB_NET_IP + + mock_driver.get_amphora.return_value = _amphora_mock + + computewait = compute_tasks.ComputeWait() + computewait.execute(COMPUTE_ID, AMPHORA_ID) + + mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID) + mock_remove_from_build_queue.assert_not_called() @mock.patch('stevedore.driver.DriverManager.driver') def test_delete_amphorae_on_load_balancer(self, mock_driver): @@ -397,6 +430,7 @@ class TestComputeTasks(base.TestCase): mock_driver.create_server_group.assert_called_once_with( 'octavia-lb-123', 'anti-affinity') + # Make sure it returns the expected server group_id self.assertEqual(server_group_test_id, sg_id) # Test revert() diff --git a/octavia/tests/unit/controller/worker/test_amphora_rate_limit.py b/octavia/tests/unit/controller/worker/test_amphora_rate_limit.py new file mode 100644 index 0000000000..5e3f4d0658 --- /dev/null +++ b/octavia/tests/unit/controller/worker/test_amphora_rate_limit.py @@ -0,0 +1,129 @@ +# Copyright 2016 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from oslo_config import cfg +from oslo_config import fixture as oslo_fixture +from oslo_utils import uuidutils + +from octavia.controller.worker import amphora_rate_limit +import octavia.tests.unit.base as base + +AMP_ID = uuidutils.generate_uuid() +BUILD_PRIORITY = 40 +USED_BUILD_SLOTS = 0 + + +class TestAmphoraBuildRateLimit(base.TestCase): + + def setUp(self): + super(TestAmphoraBuildRateLimit, self).setUp() + self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF)) + + self.rate_limit = amphora_rate_limit.AmphoraBuildRateLimit() + self.amp_build_slots_repo = mock.MagicMock() + self.amp_build_req_repo = mock.MagicMock() + self.conf.config(group='haproxy_amphora', build_rate_limit=1) + + @mock.patch('octavia.db.api.get_session', mock.MagicMock()) + @mock.patch('octavia.controller.worker.amphora_rate_limit' + '.AmphoraBuildRateLimit.wait_for_build_slot') + @mock.patch('octavia.db.repositories.AmphoraBuildReqRepository' + '.add_to_build_queue') + def test_add_to_build_request_queue(self, + mock_add_to_build_queue, + mock_wait_for_build_slot): + self.rate_limit.add_to_build_request_queue(AMP_ID, BUILD_PRIORITY) + + mock_add_to_build_queue.assert_called_once() + mock_wait_for_build_slot.assert_called_once() + + @mock.patch('octavia.db.api.get_session', mock.MagicMock()) + @mock.patch('octavia.db.repositories.AmphoraBuildSlotsRepository' + '.get_used_build_slots_count', + return_value=USED_BUILD_SLOTS) + def test_has_build_slot(self, mock_get_used_build_slots_count): + result = self.rate_limit.has_build_slot() + + mock_get_used_build_slots_count.assert_called_once() + self.assertTrue(result) + + @mock.patch('octavia.db.api.get_session', mock.MagicMock()) + @mock.patch('octavia.db.repositories.AmphoraBuildReqRepository' + '.get_highest_priority_build_req', return_value=AMP_ID) + def test_has_highest_priority(self, mock_get_highest_priority_build_req): + result = self.rate_limit.has_highest_priority(AMP_ID) + + mock_get_highest_priority_build_req.assert_called_once() + self.assertTrue(result) + + @mock.patch('octavia.db.api.get_session', mock.MagicMock()) + @mock.patch('octavia.db.repositories.AmphoraBuildReqRepository' + '.update_req_status') + @mock.patch('octavia.db.repositories.AmphoraBuildSlotsRepository' + '.update_count') + def test_update_build_status_and_available_build_slots(self, + mock_update_count, + mock_update_status): + self.rate_limit.update_build_status_and_available_build_slots(AMP_ID) + + mock_update_count.assert_called_once() + mock_update_status.assert_called_once() + + @mock.patch('octavia.db.api.get_session', mock.MagicMock()) + @mock.patch('octavia.db.repositories.AmphoraBuildReqRepository.delete') + @mock.patch('octavia.db.repositories.AmphoraBuildSlotsRepository' + '.update_count') + def test_remove_from_build_req_queue(self, + mock_update_count, + mock_delete): + self.rate_limit.remove_from_build_req_queue(AMP_ID) + + mock_update_count.assert_called_once() + mock_delete.assert_called_once() + + @mock.patch('octavia.db.api.get_session', mock.MagicMock()) + @mock.patch('octavia.db.repositories.AmphoraBuildReqRepository' + '.delete_all') + @mock.patch('octavia.db.repositories.AmphoraBuildSlotsRepository' + '.update_count') + def test_remove_all_from_build_req_queue(self, + mock_update_count, + mock_delete_all): + self.rate_limit.remove_all_from_build_req_queue() + + mock_update_count.assert_called_once() + mock_delete_all.assert_called_once() + + @mock.patch('octavia.controller.worker.amphora_rate_limit' + '.AmphoraBuildRateLimit.has_build_slot', return_value=True) + @mock.patch('octavia.controller.worker.amphora_rate_limit' + '.AmphoraBuildRateLimit.has_highest_priority', + return_value=True) + @mock.patch('octavia.controller.worker.amphora_rate_limit' + '.AmphoraBuildRateLimit.' + 'update_build_status_and_available_build_slots') + @mock.patch('octavia.controller.worker.amphora_rate_limit' + '.AmphoraBuildRateLimit.remove_all_from_build_req_queue') + @mock.patch('time.sleep') + def test_wait_for_build_slot(self, + mock_time_sleep, + mock_remove_all, + mock_update_status_and_slots_count, + mock_has_high_priority, + mock_has_build_slot): + self.rate_limit.wait_for_build_slot(AMP_ID) + + self.assertTrue(mock_has_build_slot.called) + self.assertTrue(mock_has_high_priority.called) diff --git a/octavia/tests/unit/controller/worker/test_controller_worker.py b/octavia/tests/unit/controller/worker/test_controller_worker.py index e9a18de8e0..5287a9d552 100644 --- a/octavia/tests/unit/controller/worker/test_controller_worker.py +++ b/octavia/tests/unit/controller/worker/test_controller_worker.py @@ -133,7 +133,10 @@ class TestControllerWorker(base.TestCase): amp = cw.create_amphora() (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with('TEST')) + assert_called_once_with( + 'TEST', + store={constants.BUILD_TYPE_PRIORITY: + constants.LB_CREATE_SPARES_POOL_PRIORITY})) _flow_mock.run.assert_called_once_with() @@ -400,7 +403,8 @@ class TestControllerWorker(base.TestCase): mock_taskflow_load.return_value = mock_eng store = { constants.LOADBALANCER_ID: LB_ID, - 'update_dict': {'topology': constants.TOPOLOGY_SINGLE} + 'update_dict': {'topology': constants.TOPOLOGY_SINGLE}, + constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_NORMAL_PRIORITY } setattr(mock_lb_repo_get.return_value, 'listeners', []) @@ -441,7 +445,8 @@ class TestControllerWorker(base.TestCase): mock_taskflow_load.return_value = mock_eng store = { constants.LOADBALANCER_ID: LB_ID, - 'update_dict': {'topology': constants.TOPOLOGY_ACTIVE_STANDBY} + 'update_dict': {'topology': constants.TOPOLOGY_ACTIVE_STANDBY}, + constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_NORMAL_PRIORITY } setattr(mock_lb_repo_get.return_value, 'listeners', []) @@ -483,7 +488,8 @@ class TestControllerWorker(base.TestCase): mock_taskflow_load.return_value = mock_eng store = { constants.LOADBALANCER_ID: LB_ID, - 'update_dict': {'topology': constants.TOPOLOGY_SINGLE} + 'update_dict': {'topology': constants.TOPOLOGY_SINGLE}, + constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_NORMAL_PRIORITY } cw = controller_worker.ControllerWorker() @@ -532,7 +538,8 @@ class TestControllerWorker(base.TestCase): mock_taskflow_load.return_value = mock_eng store = { constants.LOADBALANCER_ID: LB_ID, - 'update_dict': {'topology': constants.TOPOLOGY_ACTIVE_STANDBY} + 'update_dict': {'topology': constants.TOPOLOGY_ACTIVE_STANDBY}, + constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_NORMAL_PRIORITY } cw = controller_worker.ControllerWorker() @@ -1081,7 +1088,10 @@ class TestControllerWorker(base.TestCase): _flow_mock, store={constants.FAILED_AMPHORA: _amphora_mock, constants.LOADBALANCER_ID: - _amphora_mock.load_balancer_id})) + _amphora_mock.load_balancer_id, + constants.BUILD_TYPE_PRIORITY: + constants.LB_CREATE_FAILOVER_PRIORITY + })) _flow_mock.run.assert_called_once_with()