Adds a new feature to limit the amphora build rate

This patch limits the number of Amphora build requests handled by the
controller worker at a given time.

Also, the amphora build requests are assigned priorities based on
whether it is a normal loadbalancer create, failover or spares pool
loadbalancer create request. Based on the priority and the order in
which the requests were made if there is an available build slot the
amphora will be built.

Co-Authored-By: Lubosz "diltram" Kosnik <lubosz.kosnik@intel.com>
Change-Id: I967cf0668f82fb3a63e18dc7a457c58b526b7e66
Closes-Bug: #1571802
This commit is contained in:
Aishwarya Thangappa 2016-04-14 19:16:02 -07:00 committed by Lubosz Kosnik (diltram)
parent 3251ee0598
commit e94ff2681f
17 changed files with 539 additions and 68 deletions

View File

@ -116,6 +116,9 @@
# haproxy_template =
# connection_max_retries = 300
# connection_retry_interval = 5
# build_rate_limit = -1
# build_active_retries = 300
# build_retry_interval = 5
# user_group = nogroup
# Maximum number of entries that can fit in the stick table.

View File

@ -160,6 +160,18 @@ haproxy_amphora_opts = [
default=5,
help=_('Retry timeout between connection attempts in '
'seconds.')),
cfg.IntOpt('build_rate_limit',
default=-1,
help=_('Number of amphorae that could be built per controller'
'worker, simultaneously.')),
cfg.IntOpt('build_active_retries',
default=300,
help=_('Retry threshold for waiting for a build slot for '
'an amphorae.')),
cfg.IntOpt('build_retry_interval',
default=5,
help=_('Retry timeout between build attempts in '
'seconds.')),
cfg.StrOpt('user_group',
default='nogroup',
help=_('The user group for haproxy to run under inside the '

View File

@ -258,6 +258,11 @@ SERVICE_AUTH = 'service_auth'
RPC_NAMESPACE_CONTROLLER_AGENT = 'controller'
# Build Type Priority
LB_CREATE_FAILOVER_PRIORITY = 20
LB_CREATE_NORMAL_PRIORITY = 40
LB_CREATE_SPARES_POOL_PRIORITY = 60
BUILD_TYPE_PRIORITY = 'build_type_priority'
# Active standalone roles and topology
TOPOLOGY_SINGLE = 'SINGLE'

View File

@ -20,6 +20,8 @@ Octavia base exception handling.
from oslo_utils import excutils
from webob import exc
from octavia.i18n import _LE
class OctaviaException(Exception):
"""Base Octavia Exception.
@ -157,6 +159,10 @@ class ComputeBuildException(OctaviaException):
message = _('Failed to build compute instance.')
class ComputeBuildQueueTimeoutException(OctaviaException):
message = _LE('Failed to get an amphora build slot.')
class ComputeDeleteException(OctaviaException):
message = _('Failed to delete compute instance.')

View File

@ -0,0 +1,96 @@
# Copyright 2016 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import time
from oslo_config import cfg
from oslo_log import log as logging
from octavia.common import exceptions
from octavia.db import api as db_apis
from octavia.db import repositories as repo
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_group('haproxy_amphora', 'octavia.common.config')
class AmphoraBuildRateLimit(object):
def __init__(self):
self.amp_build_slots_repo = repo.AmphoraBuildSlotsRepository()
self.amp_build_req_repo = repo.AmphoraBuildReqRepository()
def add_to_build_request_queue(self, amphora_id, build_priority):
self.amp_build_req_repo.add_to_build_queue(
db_apis.get_session(),
amphora_id=amphora_id,
priority=build_priority)
LOG.debug("Added build request for %s to the queue", amphora_id)
self.wait_for_build_slot(amphora_id)
def has_build_slot(self):
build_rate_limit = CONF.haproxy_amphora.build_rate_limit
session = db_apis.get_session()
with session.begin(subtransactions=True):
used_build_slots = (self.amp_build_slots_repo
.get_used_build_slots_count(session))
available_build_slots = build_rate_limit - used_build_slots
LOG.debug("Available build slots %d", available_build_slots)
return available_build_slots > 0
def has_highest_priority(self, amphora_id):
session = db_apis.get_session()
with session.begin(subtransactions=True):
highest_priority_build_req = (
self.amp_build_req_repo.get_highest_priority_build_req(
session))
LOG.debug("Highest priority req: %s, Current req: %s",
highest_priority_build_req, amphora_id)
return amphora_id == highest_priority_build_req
def update_build_status_and_available_build_slots(self, amphora_id):
session = db_apis.get_session()
with session.begin(subtransactions=True):
self.amp_build_slots_repo.update_count(session, action='increment')
self.amp_build_req_repo.update_req_status(session, amphora_id)
def remove_from_build_req_queue(self, amphora_id):
session = db_apis.get_session()
with session.begin(subtransactions=True):
self.amp_build_req_repo.delete(session, amphora_id=amphora_id)
self.amp_build_slots_repo.update_count(session, action='decrement')
LOG.debug("Removed request for %s from queue"
" and released the build slot", amphora_id)
def remove_all_from_build_req_queue(self):
session = db_apis.get_session()
with session.begin(subtransactions=True):
self.amp_build_req_repo.delete_all(session)
self.amp_build_slots_repo.update_count(session, action='reset')
LOG.debug("Removed all the build requests and "
"released the build slots")
def wait_for_build_slot(self, amphora_id):
LOG.debug("Waiting for a build slot")
for i in range(CONF.haproxy_amphora.build_active_retries):
if (self.has_build_slot() and
self.has_highest_priority(amphora_id)):
self.update_build_status_and_available_build_slots(amphora_id)
return
time.sleep(CONF.haproxy_amphora.build_retry_interval)
self.remove_all_from_build_req_queue()
raise exceptions.ComputeBuildQueueTimeoutException()

View File

@ -80,8 +80,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
:returns: amphora_id
"""
create_amp_tf = self._taskflow_load(self._amphora_flows.
get_create_amphora_flow())
create_amp_tf = self._taskflow_load(
self._amphora_flows.get_create_amphora_flow(),
store={constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_SPARES_POOL_PRIORITY}
)
with tf_logging.DynamicLoggingListener(
create_amp_tf, log=LOG,
hide_inputs_outputs_of=self._exclude_result_logging_tasks):
@ -261,7 +264,9 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
:raises NoSuitableAmphoraException: Unable to allocate an Amphora.
"""
store = {constants.LOADBALANCER_ID: load_balancer_id}
store = {constants.LOADBALANCER_ID: load_balancer_id,
constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_NORMAL_PRIORITY}
topology = CONF.controller_worker.loadbalancer_topology
@ -622,7 +627,10 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
self._amphora_flows.get_failover_flow(role=amp.role,
status=amp.status),
store={constants.FAILED_AMPHORA: amp,
constants.LOADBALANCER_ID: amp.load_balancer_id})
constants.LOADBALANCER_ID: amp.load_balancer_id,
constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_FAILOVER_PRIORITY
})
with tf_logging.DynamicLoggingListener(
failover_amphora_tf, log=LOG,
hide_inputs_outputs_of=self._exclude_result_logging_tasks):

View File

@ -53,16 +53,17 @@ class AmphoraFlows(object):
requires=(constants.AMPHORA_ID, constants.SERVER_PEM)))
create_amphora_flow.add(compute_tasks.CertComputeCreate(
requires=(constants.AMPHORA_ID, constants.SERVER_PEM),
requires=(constants.AMPHORA_ID, constants.SERVER_PEM,
constants.BUILD_TYPE_PRIORITY),
provides=constants.COMPUTE_ID))
else:
create_amphora_flow.add(compute_tasks.ComputeCreate(
requires=constants.AMPHORA_ID,
requires=(constants.AMPHORA_ID, constants.BUILD_TYPE_PRIORITY),
provides=constants.COMPUTE_ID))
create_amphora_flow.add(database_tasks.MarkAmphoraBootingInDB(
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
create_amphora_flow.add(compute_tasks.ComputeWait(
requires=constants.COMPUTE_ID,
requires=(constants.COMPUTE_ID, constants.AMPHORA_ID),
provides=constants.COMPUTE_OBJ))
create_amphora_flow.add(database_tasks.UpdateAmphoraInfo(
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
@ -130,12 +131,14 @@ class AmphoraFlows(object):
create_amp_for_lb_subflow.add(compute_tasks.CertComputeCreate(
name=sf_name + '-' + constants.CERT_COMPUTE_CREATE,
requires=(constants.AMPHORA_ID, constants.SERVER_PEM,
constants.SERVER_GROUP_ID),
constants.SERVER_GROUP_ID,
constants.BUILD_TYPE_PRIORITY),
provides=constants.COMPUTE_ID))
else:
create_amp_for_lb_subflow.add(compute_tasks.CertComputeCreate(
name=sf_name + '-' + constants.CERT_COMPUTE_CREATE,
requires=(constants.AMPHORA_ID, constants.SERVER_PEM),
requires=(constants.AMPHORA_ID, constants.SERVER_PEM,
constants.BUILD_TYPE_PRIORITY),
provides=constants.COMPUTE_ID))
else:
@ -143,12 +146,14 @@ class AmphoraFlows(object):
) and anti_affinity:
create_amp_for_lb_subflow.add(compute_tasks.ComputeCreate(
name=sf_name + '-' + constants.COMPUTE_CREATE,
requires=(constants.AMPHORA_ID, constants.SERVER_GROUP_ID),
requires=(constants.AMPHORA_ID, constants.SERVER_GROUP_ID,
constants.BUILD_TYPE_PRIORITY),
provides=constants.COMPUTE_ID))
else:
create_amp_for_lb_subflow.add(compute_tasks.ComputeCreate(
name=sf_name + '-' + constants.COMPUTE_CREATE,
requires=constants.AMPHORA_ID,
requires=(constants.AMPHORA_ID,
constants.BUILD_TYPE_PRIORITY),
provides=constants.COMPUTE_ID))
create_amp_for_lb_subflow.add(database_tasks.UpdateAmphoraComputeId(
@ -159,7 +164,7 @@ class AmphoraFlows(object):
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
create_amp_for_lb_subflow.add(compute_tasks.ComputeWait(
name=sf_name + '-' + constants.COMPUTE_WAIT,
requires=constants.COMPUTE_ID,
requires=(constants.COMPUTE_ID, constants.AMPHORA_ID),
provides=constants.COMPUTE_OBJ))
create_amp_for_lb_subflow.add(database_tasks.UpdateAmphoraInfo(
name=sf_name + '-' + constants.UPDATE_AMPHORA_INFO,

View File

@ -26,6 +26,7 @@ from octavia.amphorae.backends.agent import agent_jinja_cfg
from octavia.common import constants
from octavia.common import exceptions
from octavia.common.jinja import user_data_jinja_cfg
from octavia.controller.worker import amphora_rate_limit
from octavia.i18n import _LE, _LW
CONF = cfg.CONF
@ -42,12 +43,15 @@ class BaseComputeTask(task.Task):
name=CONF.controller_worker.compute_driver,
invoke_on_load=True
).driver
self.rate_limit = amphora_rate_limit.AmphoraBuildRateLimit()
class ComputeCreate(BaseComputeTask):
"""Create the compute instance for a new amphora."""
def execute(self, amphora_id, ports=None, config_drive_files=None,
def execute(self, amphora_id,
build_type_priority=constants.LB_CREATE_NORMAL_PRIORITY,
ports=None, config_drive_files=None,
server_group_id=None):
"""Create an amphora
@ -65,6 +69,10 @@ class ComputeCreate(BaseComputeTask):
key_name = None if not ssh_access else ssh_key
try:
if CONF.haproxy_amphora.build_rate_limit != -1:
self.rate_limit.add_to_build_request_queue(
amphora_id, build_type_priority)
agent_cfg = agent_jinja_cfg.AgentJinjaTemplater()
config_drive_files['/etc/octavia/amphora-agent.conf'] = (
agent_cfg.build_agent_config(amphora_id))
@ -115,8 +123,9 @@ class ComputeCreate(BaseComputeTask):
class CertComputeCreate(ComputeCreate):
def execute(self, amphora_id, server_pem, ports=None,
server_group_id=None):
def execute(self, amphora_id, server_pem,
build_type_priority=constants.LB_CREATE_NORMAL_PRIORITY,
ports=None, server_group_id=None):
"""Create an amphora
:returns: an amphora
@ -129,7 +138,8 @@ class CertComputeCreate(ComputeCreate):
'/etc/octavia/certs/server.pem': server_pem,
'/etc/octavia/certs/client_ca.pem': ca}
return super(CertComputeCreate, self).execute(
amphora_id, ports=ports, config_drive_files=config_drive_files,
amphora_id, build_type_priority=build_type_priority,
ports=ports, config_drive_files=config_drive_files,
server_group_id=server_group_id)
@ -167,7 +177,7 @@ class ComputeDelete(BaseComputeTask):
class ComputeWait(BaseComputeTask):
"""Wait for the compute driver to mark the amphora active."""
def execute(self, compute_id):
def execute(self, compute_id, amphora_id):
"""Wait for the compute driver to mark the amphora active
:raises: Generic exception if the amphora is not active
@ -176,6 +186,8 @@ class ComputeWait(BaseComputeTask):
for i in range(CONF.controller_worker.amp_active_retries):
amp = self.compute.get_amphora(compute_id)
if amp.status == constants.ACTIVE:
if CONF.haproxy_amphora.build_rate_limit != -1:
self.rate_limit.remove_from_build_req_queue(amphora_id)
return amp
elif amp.status == constants.ERROR:
raise exceptions.ComputeBuildException()

View File

@ -0,0 +1,64 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""create_amphora_build_rate_limit_tables
Revision ID: fc5582da7d8a
Revises: 443fe6676637
Create Date: 2016-04-07 19:42:28.171902
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import sql
# revision identifiers, used by Alembic.
revision = 'fc5582da7d8a'
down_revision = '443fe6676637'
def upgrade():
op.create_table(
u'amphora_build_slots',
sa.Column(u'id', sa.Integer(), primary_key=True),
sa.Column(u'slots_used', sa.Integer(), default=0)
)
# Create temporary table for table data seeding
insert_table = sql.table(
u'amphora_build_slots',
sql.column(u'id', sa.Integer),
sql.column(u'slots_used', sa.Integer)
)
op.bulk_insert(
insert_table,
[
{'id': 1, 'slots_used': 0}
]
)
op.create_table(
u'amphora_build_request',
sa.Column(u'amphora_id', sa.String(36), nullable=True,
primary_key=True),
sa.Column(u'priority', sa.Integer()),
sa.Column(u'created_time', sa.DateTime(timezone=True), nullable=False),
sa.Column(u'status', sa.String(16), default='WAITING', nullable=False)
)
def downgrade():
pass

View File

@ -84,6 +84,24 @@ class L7PolicyAction(base_models.BASE, base_models.LookupTableMixin):
__tablename__ = "l7policy_action"
class AmphoraBuildSlots(base_models.BASE):
__tablename__ = "amphora_build_slots"
id = sa.Column(sa.Integer(), primary_key=True)
slots_used = sa.Column(sa.Integer())
class AmphoraBuildRequest(base_models.BASE):
__tablename__ = "amphora_build_request"
amphora_id = sa.Column(sa.String(36), nullable=True, primary_key=True)
priority = sa.Column(sa.Integer())
created_time = sa.Column(sa.DateTime, default=func.now(), nullable=False)
status = sa.Column(sa.String(16), default='WAITING', nullable=False)
class SessionPersistence(base_models.BASE):
__data_model__ = data_models.SessionPersistence

View File

@ -142,6 +142,8 @@ class Repositories(object):
self.vrrpgroup = VRRPGroupRepository()
self.l7rule = L7RuleRepository()
self.l7policy = L7PolicyRepository()
self.amp_build_slots = AmphoraBuildSlotsRepository()
self.amp_build_req = AmphoraBuildReqRepository()
self.quotas = QuotasRepository()
def create_load_balancer_and_vip(self, session, lb_dict, vip_dict):
@ -900,6 +902,73 @@ class AmphoraRepository(BaseRepository):
return amp.to_data_model()
class AmphoraBuildReqRepository(BaseRepository):
model_class = models.AmphoraBuildRequest
def add_to_build_queue(self, session, amphora_id=None, priority=None):
"""Adds the build request to the table."""
with session.begin(subtransactions=True):
model = self.model_class(amphora_id=amphora_id, priority=priority)
session.add(model)
def update_req_status(self, session, amphora_id=None):
"""Updates the request status."""
with session.begin(subtransactions=True):
(session.query(self.model_class)
.filter_by(amphora_id=amphora_id)
.update({self.model_class.status: 'BUILDING'}))
def get_highest_priority_build_req(self, session):
"""Fetches build request with highest priority and least created_time.
priority 20 = failover (highest)
priority 40 = create_loadbalancer
priority 60 = sparespool (least)
:param session: A Sql Alchemy database session.
:returns amphora_id corresponding to highest priority and least created
time in 'WAITING' status.
"""
with session.begin(subtransactions=True):
return (session.query(self.model_class.amphora_id)
.order_by(self.model_class.status.desc())
.order_by(self.model_class.priority.asc())
.order_by(self.model_class.created_time.asc())
.first())[0]
def delete_all(self, session):
"Deletes all the build requests."
with session.begin(subtransactions=True):
session.query(self.model_class).delete()
class AmphoraBuildSlotsRepository(BaseRepository):
model_class = models.AmphoraBuildSlots
def get_used_build_slots_count(self, session):
"""Gets the number of build slots in use.
:returns: Number of current build slots.
"""
with session.begin(subtransactions=True):
count = session.query(self.model_class.slots_used).one()
return count[0]
def update_count(self, session, action='increment'):
"""Increments/Decrements/Resets the number of build_slots used."""
with session.begin(subtransactions=True):
if action == 'increment':
session.query(self.model_class).filter_by(id=1).update(
{self.model_class.slots_used:
self.get_used_build_slots_count(session) + 1})
elif action == 'decrement':
session.query(self.model_class).filter_by(id=1).update(
{self.model_class.slots_used:
self.get_used_build_slots_count(session) - 1})
elif action == 'reset':
session.query(self.model_class).filter_by(id=1).update(
{self.model_class.slots_used: 0})
class SNIRepository(BaseRepository):
model_class = models.SNI

View File

@ -99,7 +99,7 @@ class AllRepositoriesTest(base.OctaviaDBTestBase):
'session_persistence', 'pool', 'member', 'listener',
'listener_stats', 'amphora', 'sni',
'amphorahealth', 'vrrpgroup', 'l7rule', 'l7policy',
'quotas')
'amp_build_slots', 'amp_build_req', 'quotas')
for repo_attr in repo_attr_names:
single_repo = getattr(self.repos, repo_attr, None)
message = ("Class Repositories should have %s instance"

View File

@ -51,7 +51,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.SERVER_PEM, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(0, len(amp_flow.requires))
self.assertEqual(1, len(amp_flow.requires))
def test_get_create_amphora_flow_cert(self, mock_get_net_driver):
self.AmpFlow = amphora_flows.AmphoraFlows()
@ -65,7 +65,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(0, len(amp_flow.requires))
self.assertEqual(1, len(amp_flow.requires))
def test_get_create_amphora_for_lb_flow(self, mock_get_net_driver):
@ -83,7 +83,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.SERVER_PEM, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(1, len(amp_flow.requires))
self.assertEqual(2, len(amp_flow.requires))
def test_get_cert_create_amphora_for_lb_flow(self, mock_get_net_driver):
@ -103,7 +103,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.SERVER_PEM, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(1, len(amp_flow.requires))
self.assertEqual(2, len(amp_flow.requires))
def test_get_cert_master_create_amphora_for_lb_flow(
self, mock_get_net_driver):
@ -124,7 +124,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.SERVER_PEM, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(1, len(amp_flow.requires))
self.assertEqual(2, len(amp_flow.requires))
def test_get_cert_master_rest_anti_affinity_create_amphora_for_lb_flow(
self, mock_get_net_driver):
@ -143,7 +143,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.SERVER_PEM, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(3, len(amp_flow.requires))
self.conf.config(group="nova", enable_anti_affinity=False)
def test_get_cert_backup_create_amphora_for_lb_flow(
@ -164,7 +164,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.SERVER_PEM, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(1, len(amp_flow.requires))
self.assertEqual(2, len(amp_flow.requires))
def test_get_cert_bogus_create_amphora_for_lb_flow(
self, mock_get_net_driver):
@ -184,7 +184,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.SERVER_PEM, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(1, len(amp_flow.requires))
self.assertEqual(2, len(amp_flow.requires))
def test_get_cert_backup_rest_anti_affinity_create_amphora_for_lb_flow(
self, mock_get_net_driver):
@ -202,7 +202,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.SERVER_PEM, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(3, len(amp_flow.requires))
self.conf.config(group="nova", enable_anti_affinity=False)
def test_get_delete_amphora_flow(self, mock_get_net_driver):
@ -253,7 +253,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LISTENERS, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(3, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(
@ -273,7 +273,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LISTENERS, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(3, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(
@ -293,7 +293,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LISTENERS, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(3, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(
@ -313,7 +313,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LISTENERS, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(3, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
def test_get_failover_flow_spare(self, mock_get_net_driver):

View File

@ -212,7 +212,7 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
create_flow.provides)
self.assertEqual(2, len(create_flow.requires))
self.assertEqual(3, len(create_flow.requires))
self.assertEqual(12, len(create_flow.provides),
create_flow.provides)
@ -240,6 +240,6 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
create_flow.provides)
self.assertEqual(2, len(create_flow.requires))
self.assertEqual(3, len(create_flow.requires))
self.assertEqual(12, len(create_flow.provides),
create_flow.provides)

View File

@ -59,17 +59,22 @@ _port.id = PORT_ID
class TestComputeTasks(base.TestCase):
def setUp(self):
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="controller_worker", amp_flavor_id=AMP_FLAVOR_ID)
conf.config(group="controller_worker", amp_image_id=AMP_IMAGE_ID)
conf.config(group="controller_worker", amp_image_tag=AMP_IMAGE_TAG)
conf.config(group="controller_worker",
amp_ssh_key_name=AMP_SSH_KEY_NAME)
conf.config(group="controller_worker", amp_boot_network_list=AMP_NET)
conf.config(group="controller_worker", amp_active_wait_sec=AMP_WAIT)
conf.config(group="controller_worker",
amp_secgroup_list=AMP_SEC_GROUPS)
conf.config(group="controller_worker", amp_image_owner_id='')
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
self.conf.config(
group="controller_worker", amp_flavor_id=AMP_FLAVOR_ID)
self.conf.config(
group="controller_worker", amp_image_id=AMP_IMAGE_ID)
self.conf.config(
group="controller_worker", amp_image_tag=AMP_IMAGE_TAG)
self.conf.config(
group="controller_worker", amp_ssh_key_name=AMP_SSH_KEY_NAME)
self.conf.config(
group="controller_worker", amp_boot_network_list=AMP_NET)
self.conf.config(
group="controller_worker", amp_active_wait_sec=AMP_WAIT)
self.conf.config(
group="controller_worker", amp_secgroup_list=AMP_SEC_GROUPS)
self.conf.config(group="controller_worker", amp_image_owner_id='')
_amphora_mock.id = AMPHORA_ID
_amphora_mock.status = constants.AMPHORA_ALLOCATED
@ -87,9 +92,8 @@ class TestComputeTasks(base.TestCase):
def test_compute_create(self, mock_driver, mock_conf, mock_jinja):
image_owner_id = uuidutils.generate_uuid()
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="controller_worker",
amp_image_owner_id=image_owner_id)
self.conf.config(
group="controller_worker", amp_image_owner_id=image_owner_id)
createcompute = compute_tasks.ComputeCreate()
@ -114,6 +118,7 @@ class TestComputeTasks(base.TestCase):
user_data=None,
server_group_id=SERVER_GRPOUP_ID)
# Make sure it returns the expected compute_id
self.assertEqual(COMPUTE_ID, compute_id)
# Test that a build exception is raised
@ -138,9 +143,6 @@ class TestComputeTasks(base.TestCase):
createcompute.revert(COMPUTE_ID, _amphora_mock.id)
conf.config(group="controller_worker",
amp_image_owner_id='')
@mock.patch('jinja2.Environment.get_template')
@mock.patch('octavia.amphorae.backends.agent.'
'agent_jinja_cfg.AgentJinjaTemplater.'
@ -152,8 +154,8 @@ class TestComputeTasks(base.TestCase):
def test_compute_create_user_data(self, mock_driver,
mock_ud_conf, mock_conf, mock_jinja):
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="controller_worker", user_data_config_drive=True)
self.conf.config(
group="controller_worker", user_data_config_drive=True)
mock_ud_conf.return_value = 'test_ud_conf'
createcompute = compute_tasks.ComputeCreate()
@ -176,6 +178,7 @@ class TestComputeTasks(base.TestCase):
user_data='test_ud_conf',
server_group_id=None)
# Make sure it returns the expected compute_id
self.assertEqual(COMPUTE_ID, compute_id)
# Test that a build exception is raised
@ -199,8 +202,6 @@ class TestComputeTasks(base.TestCase):
# Test that a delete exception is not raised
createcompute.revert(COMPUTE_ID, _amphora_mock.id)
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="controller_worker", user_data_config_drive=False)
@mock.patch('jinja2.Environment.get_template')
@mock.patch('octavia.amphorae.backends.agent.'
@ -213,8 +214,10 @@ class TestComputeTasks(base.TestCase):
createcompute = compute_tasks.ComputeCreate()
mock_driver.build.return_value = COMPUTE_ID
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="controller_worker", amp_ssh_access_allowed=False)
self.conf.config(
group="controller_worker", amp_ssh_access_allowed=False)
self.conf.config(
group="controller_worker", user_data_config_drive=False)
# Test execute()
compute_id = createcompute.execute(_amphora_mock.id, ports=[_port],
@ -320,12 +323,16 @@ class TestComputeTasks(base.TestCase):
createcompute.revert(COMPUTE_ID, _amphora_mock.id)
@mock.patch('octavia.controller.worker.amphora_rate_limit'
'.AmphoraBuildRateLimit.remove_from_build_req_queue')
@mock.patch('stevedore.driver.DriverManager.driver')
@mock.patch('time.sleep')
def test_compute_wait(self,
mock_time_sleep,
mock_driver):
mock_driver,
mock_remove_from_build_queue):
self.conf.config(group='haproxy_amphora', build_rate_limit=5)
_amphora_mock.compute_id = COMPUTE_ID
_amphora_mock.status = constants.ACTIVE
_amphora_mock.lb_network_ip = LB_NET_IP
@ -333,7 +340,7 @@ class TestComputeTasks(base.TestCase):
mock_driver.get_amphora.return_value = _amphora_mock
computewait = compute_tasks.ComputeWait()
computewait.execute(COMPUTE_ID)
computewait.execute(COMPUTE_ID, AMPHORA_ID)
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID)
@ -341,12 +348,18 @@ class TestComputeTasks(base.TestCase):
self.assertRaises(exceptions.ComputeWaitTimeoutException,
computewait.execute,
_amphora_mock)
_amphora_mock, AMPHORA_ID)
@mock.patch('octavia.controller.worker.amphora_rate_limit'
'.AmphoraBuildRateLimit.remove_from_build_req_queue')
@mock.patch('stevedore.driver.DriverManager.driver')
@mock.patch('time.sleep')
def test_compute_wait_error_status(self, mock_time_sleep, mock_driver):
def test_compute_wait_error_status(self,
mock_time_sleep,
mock_driver,
mock_remove_from_build_queue):
self.conf.config(group='haproxy_amphora', build_rate_limit=5)
_amphora_mock.compute_id = COMPUTE_ID
_amphora_mock.status = constants.ACTIVE
_amphora_mock.lb_network_ip = LB_NET_IP
@ -354,7 +367,7 @@ class TestComputeTasks(base.TestCase):
mock_driver.get_amphora.return_value = _amphora_mock
computewait = compute_tasks.ComputeWait()
computewait.execute(COMPUTE_ID)
computewait.execute(COMPUTE_ID, AMPHORA_ID)
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID)
@ -362,7 +375,27 @@ class TestComputeTasks(base.TestCase):
self.assertRaises(exceptions.ComputeBuildException,
computewait.execute,
_amphora_mock)
_amphora_mock, AMPHORA_ID)
@mock.patch('octavia.controller.worker.amphora_rate_limit'
'.AmphoraBuildRateLimit.remove_from_build_req_queue')
@mock.patch('stevedore.driver.DriverManager.driver')
@mock.patch('time.sleep')
def test_compute_wait_skipped(self,
mock_time_sleep,
mock_driver,
mock_remove_from_build_queue):
_amphora_mock.compute_id = COMPUTE_ID
_amphora_mock.status = constants.ACTIVE
_amphora_mock.lb_network_ip = LB_NET_IP
mock_driver.get_amphora.return_value = _amphora_mock
computewait = compute_tasks.ComputeWait()
computewait.execute(COMPUTE_ID, AMPHORA_ID)
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID)
mock_remove_from_build_queue.assert_not_called()
@mock.patch('stevedore.driver.DriverManager.driver')
def test_delete_amphorae_on_load_balancer(self, mock_driver):
@ -397,6 +430,7 @@ class TestComputeTasks(base.TestCase):
mock_driver.create_server_group.assert_called_once_with(
'octavia-lb-123', 'anti-affinity')
# Make sure it returns the expected server group_id
self.assertEqual(server_group_test_id, sg_id)
# Test revert()

View File

@ -0,0 +1,129 @@
# Copyright 2016 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from octavia.controller.worker import amphora_rate_limit
import octavia.tests.unit.base as base
AMP_ID = uuidutils.generate_uuid()
BUILD_PRIORITY = 40
USED_BUILD_SLOTS = 0
class TestAmphoraBuildRateLimit(base.TestCase):
def setUp(self):
super(TestAmphoraBuildRateLimit, self).setUp()
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
self.rate_limit = amphora_rate_limit.AmphoraBuildRateLimit()
self.amp_build_slots_repo = mock.MagicMock()
self.amp_build_req_repo = mock.MagicMock()
self.conf.config(group='haproxy_amphora', build_rate_limit=1)
@mock.patch('octavia.db.api.get_session', mock.MagicMock())
@mock.patch('octavia.controller.worker.amphora_rate_limit'
'.AmphoraBuildRateLimit.wait_for_build_slot')
@mock.patch('octavia.db.repositories.AmphoraBuildReqRepository'
'.add_to_build_queue')
def test_add_to_build_request_queue(self,
mock_add_to_build_queue,
mock_wait_for_build_slot):
self.rate_limit.add_to_build_request_queue(AMP_ID, BUILD_PRIORITY)
mock_add_to_build_queue.assert_called_once()
mock_wait_for_build_slot.assert_called_once()
@mock.patch('octavia.db.api.get_session', mock.MagicMock())
@mock.patch('octavia.db.repositories.AmphoraBuildSlotsRepository'
'.get_used_build_slots_count',
return_value=USED_BUILD_SLOTS)
def test_has_build_slot(self, mock_get_used_build_slots_count):
result = self.rate_limit.has_build_slot()
mock_get_used_build_slots_count.assert_called_once()
self.assertTrue(result)
@mock.patch('octavia.db.api.get_session', mock.MagicMock())
@mock.patch('octavia.db.repositories.AmphoraBuildReqRepository'
'.get_highest_priority_build_req', return_value=AMP_ID)
def test_has_highest_priority(self, mock_get_highest_priority_build_req):
result = self.rate_limit.has_highest_priority(AMP_ID)
mock_get_highest_priority_build_req.assert_called_once()
self.assertTrue(result)
@mock.patch('octavia.db.api.get_session', mock.MagicMock())
@mock.patch('octavia.db.repositories.AmphoraBuildReqRepository'
'.update_req_status')
@mock.patch('octavia.db.repositories.AmphoraBuildSlotsRepository'
'.update_count')
def test_update_build_status_and_available_build_slots(self,
mock_update_count,
mock_update_status):
self.rate_limit.update_build_status_and_available_build_slots(AMP_ID)
mock_update_count.assert_called_once()
mock_update_status.assert_called_once()
@mock.patch('octavia.db.api.get_session', mock.MagicMock())
@mock.patch('octavia.db.repositories.AmphoraBuildReqRepository.delete')
@mock.patch('octavia.db.repositories.AmphoraBuildSlotsRepository'
'.update_count')
def test_remove_from_build_req_queue(self,
mock_update_count,
mock_delete):
self.rate_limit.remove_from_build_req_queue(AMP_ID)
mock_update_count.assert_called_once()
mock_delete.assert_called_once()
@mock.patch('octavia.db.api.get_session', mock.MagicMock())
@mock.patch('octavia.db.repositories.AmphoraBuildReqRepository'
'.delete_all')
@mock.patch('octavia.db.repositories.AmphoraBuildSlotsRepository'
'.update_count')
def test_remove_all_from_build_req_queue(self,
mock_update_count,
mock_delete_all):
self.rate_limit.remove_all_from_build_req_queue()
mock_update_count.assert_called_once()
mock_delete_all.assert_called_once()
@mock.patch('octavia.controller.worker.amphora_rate_limit'
'.AmphoraBuildRateLimit.has_build_slot', return_value=True)
@mock.patch('octavia.controller.worker.amphora_rate_limit'
'.AmphoraBuildRateLimit.has_highest_priority',
return_value=True)
@mock.patch('octavia.controller.worker.amphora_rate_limit'
'.AmphoraBuildRateLimit.'
'update_build_status_and_available_build_slots')
@mock.patch('octavia.controller.worker.amphora_rate_limit'
'.AmphoraBuildRateLimit.remove_all_from_build_req_queue')
@mock.patch('time.sleep')
def test_wait_for_build_slot(self,
mock_time_sleep,
mock_remove_all,
mock_update_status_and_slots_count,
mock_has_high_priority,
mock_has_build_slot):
self.rate_limit.wait_for_build_slot(AMP_ID)
self.assertTrue(mock_has_build_slot.called)
self.assertTrue(mock_has_high_priority.called)

View File

@ -133,7 +133,10 @@ class TestControllerWorker(base.TestCase):
amp = cw.create_amphora()
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with('TEST'))
assert_called_once_with(
'TEST',
store={constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_SPARES_POOL_PRIORITY}))
_flow_mock.run.assert_called_once_with()
@ -400,7 +403,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load.return_value = mock_eng
store = {
constants.LOADBALANCER_ID: LB_ID,
'update_dict': {'topology': constants.TOPOLOGY_SINGLE}
'update_dict': {'topology': constants.TOPOLOGY_SINGLE},
constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_NORMAL_PRIORITY
}
setattr(mock_lb_repo_get.return_value, 'listeners', [])
@ -441,7 +445,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load.return_value = mock_eng
store = {
constants.LOADBALANCER_ID: LB_ID,
'update_dict': {'topology': constants.TOPOLOGY_ACTIVE_STANDBY}
'update_dict': {'topology': constants.TOPOLOGY_ACTIVE_STANDBY},
constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_NORMAL_PRIORITY
}
setattr(mock_lb_repo_get.return_value, 'listeners', [])
@ -483,7 +488,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load.return_value = mock_eng
store = {
constants.LOADBALANCER_ID: LB_ID,
'update_dict': {'topology': constants.TOPOLOGY_SINGLE}
'update_dict': {'topology': constants.TOPOLOGY_SINGLE},
constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_NORMAL_PRIORITY
}
cw = controller_worker.ControllerWorker()
@ -532,7 +538,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load.return_value = mock_eng
store = {
constants.LOADBALANCER_ID: LB_ID,
'update_dict': {'topology': constants.TOPOLOGY_ACTIVE_STANDBY}
'update_dict': {'topology': constants.TOPOLOGY_ACTIVE_STANDBY},
constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_NORMAL_PRIORITY
}
cw = controller_worker.ControllerWorker()
@ -1081,7 +1088,10 @@ class TestControllerWorker(base.TestCase):
_flow_mock,
store={constants.FAILED_AMPHORA: _amphora_mock,
constants.LOADBALANCER_ID:
_amphora_mock.load_balancer_id}))
_amphora_mock.load_balancer_id,
constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_FAILOVER_PRIORITY
}))
_flow_mock.run.assert_called_once_with()