Merge "Adds a new feature to limit the amphora build rate"

This commit is contained in:
Jenkins 2017-03-16 20:59:12 +00:00 committed by Gerrit Code Review
commit 114c6428f1
17 changed files with 539 additions and 68 deletions

View File

@ -116,6 +116,9 @@
# haproxy_template =
# connection_max_retries = 300
# connection_retry_interval = 5
# build_rate_limit = -1
# build_active_retries = 300
# build_retry_interval = 5
# user_group = nogroup
# Maximum number of entries that can fit in the stick table.

View File

@ -160,6 +160,18 @@ haproxy_amphora_opts = [
default=5,
help=_('Retry timeout between connection attempts in '
'seconds.')),
cfg.IntOpt('build_rate_limit',
default=-1,
help=_('Number of amphorae that could be built per controller'
'worker, simultaneously.')),
cfg.IntOpt('build_active_retries',
default=300,
help=_('Retry threshold for waiting for a build slot for '
'an amphorae.')),
cfg.IntOpt('build_retry_interval',
default=5,
help=_('Retry timeout between build attempts in '
'seconds.')),
cfg.StrOpt('user_group',
default='nogroup',
help=_('The user group for haproxy to run under inside the '

View File

@ -258,6 +258,11 @@ SERVICE_AUTH = 'service_auth'
RPC_NAMESPACE_CONTROLLER_AGENT = 'controller'
# Build Type Priority
LB_CREATE_FAILOVER_PRIORITY = 20
LB_CREATE_NORMAL_PRIORITY = 40
LB_CREATE_SPARES_POOL_PRIORITY = 60
BUILD_TYPE_PRIORITY = 'build_type_priority'
# Active standalone roles and topology
TOPOLOGY_SINGLE = 'SINGLE'

View File

@ -20,6 +20,8 @@ Octavia base exception handling.
from oslo_utils import excutils
from webob import exc
from octavia.i18n import _LE
class OctaviaException(Exception):
"""Base Octavia Exception.
@ -162,6 +164,10 @@ class ComputeBuildException(OctaviaException):
message = _('Failed to build compute instance.')
class ComputeBuildQueueTimeoutException(OctaviaException):
message = _LE('Failed to get an amphora build slot.')
class ComputeDeleteException(OctaviaException):
message = _('Failed to delete compute instance.')

View File

@ -0,0 +1,96 @@
# Copyright 2016 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import time
from oslo_config import cfg
from oslo_log import log as logging
from octavia.common import exceptions
from octavia.db import api as db_apis
from octavia.db import repositories as repo
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_group('haproxy_amphora', 'octavia.common.config')
class AmphoraBuildRateLimit(object):
def __init__(self):
self.amp_build_slots_repo = repo.AmphoraBuildSlotsRepository()
self.amp_build_req_repo = repo.AmphoraBuildReqRepository()
def add_to_build_request_queue(self, amphora_id, build_priority):
self.amp_build_req_repo.add_to_build_queue(
db_apis.get_session(),
amphora_id=amphora_id,
priority=build_priority)
LOG.debug("Added build request for %s to the queue", amphora_id)
self.wait_for_build_slot(amphora_id)
def has_build_slot(self):
build_rate_limit = CONF.haproxy_amphora.build_rate_limit
session = db_apis.get_session()
with session.begin(subtransactions=True):
used_build_slots = (self.amp_build_slots_repo
.get_used_build_slots_count(session))
available_build_slots = build_rate_limit - used_build_slots
LOG.debug("Available build slots %d", available_build_slots)
return available_build_slots > 0
def has_highest_priority(self, amphora_id):
session = db_apis.get_session()
with session.begin(subtransactions=True):
highest_priority_build_req = (
self.amp_build_req_repo.get_highest_priority_build_req(
session))
LOG.debug("Highest priority req: %s, Current req: %s",
highest_priority_build_req, amphora_id)
return amphora_id == highest_priority_build_req
def update_build_status_and_available_build_slots(self, amphora_id):
session = db_apis.get_session()
with session.begin(subtransactions=True):
self.amp_build_slots_repo.update_count(session, action='increment')
self.amp_build_req_repo.update_req_status(session, amphora_id)
def remove_from_build_req_queue(self, amphora_id):
session = db_apis.get_session()
with session.begin(subtransactions=True):
self.amp_build_req_repo.delete(session, amphora_id=amphora_id)
self.amp_build_slots_repo.update_count(session, action='decrement')
LOG.debug("Removed request for %s from queue"
" and released the build slot", amphora_id)
def remove_all_from_build_req_queue(self):
session = db_apis.get_session()
with session.begin(subtransactions=True):
self.amp_build_req_repo.delete_all(session)
self.amp_build_slots_repo.update_count(session, action='reset')
LOG.debug("Removed all the build requests and "
"released the build slots")
def wait_for_build_slot(self, amphora_id):
LOG.debug("Waiting for a build slot")
for i in range(CONF.haproxy_amphora.build_active_retries):
if (self.has_build_slot() and
self.has_highest_priority(amphora_id)):
self.update_build_status_and_available_build_slots(amphora_id)
return
time.sleep(CONF.haproxy_amphora.build_retry_interval)
self.remove_all_from_build_req_queue()
raise exceptions.ComputeBuildQueueTimeoutException()

View File

@ -80,8 +80,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
:returns: amphora_id
"""
create_amp_tf = self._taskflow_load(self._amphora_flows.
get_create_amphora_flow())
create_amp_tf = self._taskflow_load(
self._amphora_flows.get_create_amphora_flow(),
store={constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_SPARES_POOL_PRIORITY}
)
with tf_logging.DynamicLoggingListener(
create_amp_tf, log=LOG,
hide_inputs_outputs_of=self._exclude_result_logging_tasks):
@ -261,7 +264,9 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
:raises NoSuitableAmphoraException: Unable to allocate an Amphora.
"""
store = {constants.LOADBALANCER_ID: load_balancer_id}
store = {constants.LOADBALANCER_ID: load_balancer_id,
constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_NORMAL_PRIORITY}
topology = CONF.controller_worker.loadbalancer_topology
@ -622,7 +627,10 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
self._amphora_flows.get_failover_flow(role=amp.role,
status=amp.status),
store={constants.FAILED_AMPHORA: amp,
constants.LOADBALANCER_ID: amp.load_balancer_id})
constants.LOADBALANCER_ID: amp.load_balancer_id,
constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_FAILOVER_PRIORITY
})
with tf_logging.DynamicLoggingListener(
failover_amphora_tf, log=LOG,
hide_inputs_outputs_of=self._exclude_result_logging_tasks):

View File

@ -53,16 +53,17 @@ class AmphoraFlows(object):
requires=(constants.AMPHORA_ID, constants.SERVER_PEM)))
create_amphora_flow.add(compute_tasks.CertComputeCreate(
requires=(constants.AMPHORA_ID, constants.SERVER_PEM),
requires=(constants.AMPHORA_ID, constants.SERVER_PEM,
constants.BUILD_TYPE_PRIORITY),
provides=constants.COMPUTE_ID))
else:
create_amphora_flow.add(compute_tasks.ComputeCreate(
requires=constants.AMPHORA_ID,
requires=(constants.AMPHORA_ID, constants.BUILD_TYPE_PRIORITY),
provides=constants.COMPUTE_ID))
create_amphora_flow.add(database_tasks.MarkAmphoraBootingInDB(
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
create_amphora_flow.add(compute_tasks.ComputeWait(
requires=constants.COMPUTE_ID,
requires=(constants.COMPUTE_ID, constants.AMPHORA_ID),
provides=constants.COMPUTE_OBJ))
create_amphora_flow.add(database_tasks.UpdateAmphoraInfo(
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
@ -130,12 +131,14 @@ class AmphoraFlows(object):
create_amp_for_lb_subflow.add(compute_tasks.CertComputeCreate(
name=sf_name + '-' + constants.CERT_COMPUTE_CREATE,
requires=(constants.AMPHORA_ID, constants.SERVER_PEM,
constants.SERVER_GROUP_ID),
constants.SERVER_GROUP_ID,
constants.BUILD_TYPE_PRIORITY),
provides=constants.COMPUTE_ID))
else:
create_amp_for_lb_subflow.add(compute_tasks.CertComputeCreate(
name=sf_name + '-' + constants.CERT_COMPUTE_CREATE,
requires=(constants.AMPHORA_ID, constants.SERVER_PEM),
requires=(constants.AMPHORA_ID, constants.SERVER_PEM,
constants.BUILD_TYPE_PRIORITY),
provides=constants.COMPUTE_ID))
else:
@ -143,12 +146,14 @@ class AmphoraFlows(object):
) and anti_affinity:
create_amp_for_lb_subflow.add(compute_tasks.ComputeCreate(
name=sf_name + '-' + constants.COMPUTE_CREATE,
requires=(constants.AMPHORA_ID, constants.SERVER_GROUP_ID),
requires=(constants.AMPHORA_ID, constants.SERVER_GROUP_ID,
constants.BUILD_TYPE_PRIORITY),
provides=constants.COMPUTE_ID))
else:
create_amp_for_lb_subflow.add(compute_tasks.ComputeCreate(
name=sf_name + '-' + constants.COMPUTE_CREATE,
requires=constants.AMPHORA_ID,
requires=(constants.AMPHORA_ID,
constants.BUILD_TYPE_PRIORITY),
provides=constants.COMPUTE_ID))
create_amp_for_lb_subflow.add(database_tasks.UpdateAmphoraComputeId(
@ -159,7 +164,7 @@ class AmphoraFlows(object):
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
create_amp_for_lb_subflow.add(compute_tasks.ComputeWait(
name=sf_name + '-' + constants.COMPUTE_WAIT,
requires=constants.COMPUTE_ID,
requires=(constants.COMPUTE_ID, constants.AMPHORA_ID),
provides=constants.COMPUTE_OBJ))
create_amp_for_lb_subflow.add(database_tasks.UpdateAmphoraInfo(
name=sf_name + '-' + constants.UPDATE_AMPHORA_INFO,

View File

@ -26,6 +26,7 @@ from octavia.amphorae.backends.agent import agent_jinja_cfg
from octavia.common import constants
from octavia.common import exceptions
from octavia.common.jinja import user_data_jinja_cfg
from octavia.controller.worker import amphora_rate_limit
from octavia.i18n import _LE, _LW
CONF = cfg.CONF
@ -42,12 +43,15 @@ class BaseComputeTask(task.Task):
name=CONF.controller_worker.compute_driver,
invoke_on_load=True
).driver
self.rate_limit = amphora_rate_limit.AmphoraBuildRateLimit()
class ComputeCreate(BaseComputeTask):
"""Create the compute instance for a new amphora."""
def execute(self, amphora_id, ports=None, config_drive_files=None,
def execute(self, amphora_id,
build_type_priority=constants.LB_CREATE_NORMAL_PRIORITY,
ports=None, config_drive_files=None,
server_group_id=None):
"""Create an amphora
@ -65,6 +69,10 @@ class ComputeCreate(BaseComputeTask):
key_name = None if not ssh_access else ssh_key
try:
if CONF.haproxy_amphora.build_rate_limit != -1:
self.rate_limit.add_to_build_request_queue(
amphora_id, build_type_priority)
agent_cfg = agent_jinja_cfg.AgentJinjaTemplater()
config_drive_files['/etc/octavia/amphora-agent.conf'] = (
agent_cfg.build_agent_config(amphora_id))
@ -115,8 +123,9 @@ class ComputeCreate(BaseComputeTask):
class CertComputeCreate(ComputeCreate):
def execute(self, amphora_id, server_pem, ports=None,
server_group_id=None):
def execute(self, amphora_id, server_pem,
build_type_priority=constants.LB_CREATE_NORMAL_PRIORITY,
ports=None, server_group_id=None):
"""Create an amphora
:returns: an amphora
@ -129,7 +138,8 @@ class CertComputeCreate(ComputeCreate):
'/etc/octavia/certs/server.pem': server_pem,
'/etc/octavia/certs/client_ca.pem': ca}
return super(CertComputeCreate, self).execute(
amphora_id, ports=ports, config_drive_files=config_drive_files,
amphora_id, build_type_priority=build_type_priority,
ports=ports, config_drive_files=config_drive_files,
server_group_id=server_group_id)
@ -167,7 +177,7 @@ class ComputeDelete(BaseComputeTask):
class ComputeWait(BaseComputeTask):
"""Wait for the compute driver to mark the amphora active."""
def execute(self, compute_id):
def execute(self, compute_id, amphora_id):
"""Wait for the compute driver to mark the amphora active
:raises: Generic exception if the amphora is not active
@ -176,6 +186,8 @@ class ComputeWait(BaseComputeTask):
for i in range(CONF.controller_worker.amp_active_retries):
amp = self.compute.get_amphora(compute_id)
if amp.status == constants.ACTIVE:
if CONF.haproxy_amphora.build_rate_limit != -1:
self.rate_limit.remove_from_build_req_queue(amphora_id)
return amp
elif amp.status == constants.ERROR:
raise exceptions.ComputeBuildException()

View File

@ -0,0 +1,64 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""create_amphora_build_rate_limit_tables
Revision ID: fc5582da7d8a
Revises: 443fe6676637
Create Date: 2016-04-07 19:42:28.171902
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import sql
# revision identifiers, used by Alembic.
revision = 'fc5582da7d8a'
down_revision = '443fe6676637'
def upgrade():
op.create_table(
u'amphora_build_slots',
sa.Column(u'id', sa.Integer(), primary_key=True),
sa.Column(u'slots_used', sa.Integer(), default=0)
)
# Create temporary table for table data seeding
insert_table = sql.table(
u'amphora_build_slots',
sql.column(u'id', sa.Integer),
sql.column(u'slots_used', sa.Integer)
)
op.bulk_insert(
insert_table,
[
{'id': 1, 'slots_used': 0}
]
)
op.create_table(
u'amphora_build_request',
sa.Column(u'amphora_id', sa.String(36), nullable=True,
primary_key=True),
sa.Column(u'priority', sa.Integer()),
sa.Column(u'created_time', sa.DateTime(timezone=True), nullable=False),
sa.Column(u'status', sa.String(16), default='WAITING', nullable=False)
)
def downgrade():
pass

View File

@ -84,6 +84,24 @@ class L7PolicyAction(base_models.BASE, base_models.LookupTableMixin):
__tablename__ = "l7policy_action"
class AmphoraBuildSlots(base_models.BASE):
__tablename__ = "amphora_build_slots"
id = sa.Column(sa.Integer(), primary_key=True)
slots_used = sa.Column(sa.Integer())
class AmphoraBuildRequest(base_models.BASE):
__tablename__ = "amphora_build_request"
amphora_id = sa.Column(sa.String(36), nullable=True, primary_key=True)
priority = sa.Column(sa.Integer())
created_time = sa.Column(sa.DateTime, default=func.now(), nullable=False)
status = sa.Column(sa.String(16), default='WAITING', nullable=False)
class SessionPersistence(base_models.BASE):
__data_model__ = data_models.SessionPersistence

View File

@ -142,6 +142,8 @@ class Repositories(object):
self.vrrpgroup = VRRPGroupRepository()
self.l7rule = L7RuleRepository()
self.l7policy = L7PolicyRepository()
self.amp_build_slots = AmphoraBuildSlotsRepository()
self.amp_build_req = AmphoraBuildReqRepository()
self.quotas = QuotasRepository()
def create_load_balancer_and_vip(self, session, lb_dict, vip_dict):
@ -900,6 +902,73 @@ class AmphoraRepository(BaseRepository):
return amp.to_data_model()
class AmphoraBuildReqRepository(BaseRepository):
model_class = models.AmphoraBuildRequest
def add_to_build_queue(self, session, amphora_id=None, priority=None):
"""Adds the build request to the table."""
with session.begin(subtransactions=True):
model = self.model_class(amphora_id=amphora_id, priority=priority)
session.add(model)
def update_req_status(self, session, amphora_id=None):
"""Updates the request status."""
with session.begin(subtransactions=True):
(session.query(self.model_class)
.filter_by(amphora_id=amphora_id)
.update({self.model_class.status: 'BUILDING'}))
def get_highest_priority_build_req(self, session):
"""Fetches build request with highest priority and least created_time.
priority 20 = failover (highest)
priority 40 = create_loadbalancer
priority 60 = sparespool (least)
:param session: A Sql Alchemy database session.
:returns amphora_id corresponding to highest priority and least created
time in 'WAITING' status.
"""
with session.begin(subtransactions=True):
return (session.query(self.model_class.amphora_id)
.order_by(self.model_class.status.desc())
.order_by(self.model_class.priority.asc())
.order_by(self.model_class.created_time.asc())
.first())[0]
def delete_all(self, session):
"Deletes all the build requests."
with session.begin(subtransactions=True):
session.query(self.model_class).delete()
class AmphoraBuildSlotsRepository(BaseRepository):
model_class = models.AmphoraBuildSlots
def get_used_build_slots_count(self, session):
"""Gets the number of build slots in use.
:returns: Number of current build slots.
"""
with session.begin(subtransactions=True):
count = session.query(self.model_class.slots_used).one()
return count[0]
def update_count(self, session, action='increment'):
"""Increments/Decrements/Resets the number of build_slots used."""
with session.begin(subtransactions=True):
if action == 'increment':
session.query(self.model_class).filter_by(id=1).update(
{self.model_class.slots_used:
self.get_used_build_slots_count(session) + 1})
elif action == 'decrement':
session.query(self.model_class).filter_by(id=1).update(
{self.model_class.slots_used:
self.get_used_build_slots_count(session) - 1})
elif action == 'reset':
session.query(self.model_class).filter_by(id=1).update(
{self.model_class.slots_used: 0})
class SNIRepository(BaseRepository):
model_class = models.SNI

View File

@ -99,7 +99,7 @@ class AllRepositoriesTest(base.OctaviaDBTestBase):
'session_persistence', 'pool', 'member', 'listener',
'listener_stats', 'amphora', 'sni',
'amphorahealth', 'vrrpgroup', 'l7rule', 'l7policy',
'quotas')
'amp_build_slots', 'amp_build_req', 'quotas')
for repo_attr in repo_attr_names:
single_repo = getattr(self.repos, repo_attr, None)
message = ("Class Repositories should have %s instance"

View File

@ -51,7 +51,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.SERVER_PEM, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(0, len(amp_flow.requires))
self.assertEqual(1, len(amp_flow.requires))
def test_get_create_amphora_flow_cert(self, mock_get_net_driver):
self.AmpFlow = amphora_flows.AmphoraFlows()
@ -65,7 +65,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(0, len(amp_flow.requires))
self.assertEqual(1, len(amp_flow.requires))
def test_get_create_amphora_for_lb_flow(self, mock_get_net_driver):
@ -83,7 +83,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.SERVER_PEM, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(1, len(amp_flow.requires))
self.assertEqual(2, len(amp_flow.requires))
def test_get_cert_create_amphora_for_lb_flow(self, mock_get_net_driver):
@ -103,7 +103,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.SERVER_PEM, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(1, len(amp_flow.requires))
self.assertEqual(2, len(amp_flow.requires))
def test_get_cert_master_create_amphora_for_lb_flow(
self, mock_get_net_driver):
@ -124,7 +124,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.SERVER_PEM, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(1, len(amp_flow.requires))
self.assertEqual(2, len(amp_flow.requires))
def test_get_cert_master_rest_anti_affinity_create_amphora_for_lb_flow(
self, mock_get_net_driver):
@ -143,7 +143,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.SERVER_PEM, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(3, len(amp_flow.requires))
self.conf.config(group="nova", enable_anti_affinity=False)
def test_get_cert_backup_create_amphora_for_lb_flow(
@ -164,7 +164,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.SERVER_PEM, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(1, len(amp_flow.requires))
self.assertEqual(2, len(amp_flow.requires))
def test_get_cert_bogus_create_amphora_for_lb_flow(
self, mock_get_net_driver):
@ -184,7 +184,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.SERVER_PEM, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(1, len(amp_flow.requires))
self.assertEqual(2, len(amp_flow.requires))
def test_get_cert_backup_rest_anti_affinity_create_amphora_for_lb_flow(
self, mock_get_net_driver):
@ -202,7 +202,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.SERVER_PEM, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(3, len(amp_flow.requires))
self.conf.config(group="nova", enable_anti_affinity=False)
def test_get_delete_amphora_flow(self, mock_get_net_driver):
@ -253,7 +253,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LISTENERS, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(3, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(
@ -273,7 +273,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LISTENERS, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(3, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(
@ -293,7 +293,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LISTENERS, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(3, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(
@ -313,7 +313,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LISTENERS, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(3, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
def test_get_failover_flow_spare(self, mock_get_net_driver):

View File

@ -212,7 +212,7 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
create_flow.provides)
self.assertEqual(2, len(create_flow.requires))
self.assertEqual(3, len(create_flow.requires))
self.assertEqual(12, len(create_flow.provides),
create_flow.provides)
@ -240,6 +240,6 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
create_flow.provides)
self.assertEqual(2, len(create_flow.requires))
self.assertEqual(3, len(create_flow.requires))
self.assertEqual(12, len(create_flow.provides),
create_flow.provides)

View File

@ -59,17 +59,22 @@ _port.id = PORT_ID
class TestComputeTasks(base.TestCase):
def setUp(self):
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="controller_worker", amp_flavor_id=AMP_FLAVOR_ID)
conf.config(group="controller_worker", amp_image_id=AMP_IMAGE_ID)
conf.config(group="controller_worker", amp_image_tag=AMP_IMAGE_TAG)
conf.config(group="controller_worker",
amp_ssh_key_name=AMP_SSH_KEY_NAME)
conf.config(group="controller_worker", amp_boot_network_list=AMP_NET)
conf.config(group="controller_worker", amp_active_wait_sec=AMP_WAIT)
conf.config(group="controller_worker",
amp_secgroup_list=AMP_SEC_GROUPS)
conf.config(group="controller_worker", amp_image_owner_id='')
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
self.conf.config(
group="controller_worker", amp_flavor_id=AMP_FLAVOR_ID)
self.conf.config(
group="controller_worker", amp_image_id=AMP_IMAGE_ID)
self.conf.config(
group="controller_worker", amp_image_tag=AMP_IMAGE_TAG)
self.conf.config(
group="controller_worker", amp_ssh_key_name=AMP_SSH_KEY_NAME)
self.conf.config(
group="controller_worker", amp_boot_network_list=AMP_NET)
self.conf.config(
group="controller_worker", amp_active_wait_sec=AMP_WAIT)
self.conf.config(
group="controller_worker", amp_secgroup_list=AMP_SEC_GROUPS)
self.conf.config(group="controller_worker", amp_image_owner_id='')
_amphora_mock.id = AMPHORA_ID
_amphora_mock.status = constants.AMPHORA_ALLOCATED
@ -87,9 +92,8 @@ class TestComputeTasks(base.TestCase):
def test_compute_create(self, mock_driver, mock_conf, mock_jinja):
image_owner_id = uuidutils.generate_uuid()
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="controller_worker",
amp_image_owner_id=image_owner_id)
self.conf.config(
group="controller_worker", amp_image_owner_id=image_owner_id)
createcompute = compute_tasks.ComputeCreate()
@ -114,6 +118,7 @@ class TestComputeTasks(base.TestCase):
user_data=None,
server_group_id=SERVER_GRPOUP_ID)
# Make sure it returns the expected compute_id
self.assertEqual(COMPUTE_ID, compute_id)
# Test that a build exception is raised
@ -138,9 +143,6 @@ class TestComputeTasks(base.TestCase):
createcompute.revert(COMPUTE_ID, _amphora_mock.id)
conf.config(group="controller_worker",
amp_image_owner_id='')
@mock.patch('jinja2.Environment.get_template')
@mock.patch('octavia.amphorae.backends.agent.'
'agent_jinja_cfg.AgentJinjaTemplater.'
@ -152,8 +154,8 @@ class TestComputeTasks(base.TestCase):
def test_compute_create_user_data(self, mock_driver,
mock_ud_conf, mock_conf, mock_jinja):
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="controller_worker", user_data_config_drive=True)
self.conf.config(
group="controller_worker", user_data_config_drive=True)
mock_ud_conf.return_value = 'test_ud_conf'
createcompute = compute_tasks.ComputeCreate()
@ -176,6 +178,7 @@ class TestComputeTasks(base.TestCase):
user_data='test_ud_conf',
server_group_id=None)
# Make sure it returns the expected compute_id
self.assertEqual(COMPUTE_ID, compute_id)
# Test that a build exception is raised
@ -199,8 +202,6 @@ class TestComputeTasks(base.TestCase):
# Test that a delete exception is not raised
createcompute.revert(COMPUTE_ID, _amphora_mock.id)
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="controller_worker", user_data_config_drive=False)
@mock.patch('jinja2.Environment.get_template')
@mock.patch('octavia.amphorae.backends.agent.'
@ -213,8 +214,10 @@ class TestComputeTasks(base.TestCase):
createcompute = compute_tasks.ComputeCreate()
mock_driver.build.return_value = COMPUTE_ID
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="controller_worker", amp_ssh_access_allowed=False)
self.conf.config(
group="controller_worker", amp_ssh_access_allowed=False)
self.conf.config(
group="controller_worker", user_data_config_drive=False)
# Test execute()
compute_id = createcompute.execute(_amphora_mock.id, ports=[_port],
@ -320,12 +323,16 @@ class TestComputeTasks(base.TestCase):
createcompute.revert(COMPUTE_ID, _amphora_mock.id)
@mock.patch('octavia.controller.worker.amphora_rate_limit'
'.AmphoraBuildRateLimit.remove_from_build_req_queue')
@mock.patch('stevedore.driver.DriverManager.driver')
@mock.patch('time.sleep')
def test_compute_wait(self,
mock_time_sleep,
mock_driver):
mock_driver,
mock_remove_from_build_queue):
self.conf.config(group='haproxy_amphora', build_rate_limit=5)
_amphora_mock.compute_id = COMPUTE_ID
_amphora_mock.status = constants.ACTIVE
_amphora_mock.lb_network_ip = LB_NET_IP
@ -333,7 +340,7 @@ class TestComputeTasks(base.TestCase):
mock_driver.get_amphora.return_value = _amphora_mock
computewait = compute_tasks.ComputeWait()
computewait.execute(COMPUTE_ID)
computewait.execute(COMPUTE_ID, AMPHORA_ID)
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID)
@ -341,12 +348,18 @@ class TestComputeTasks(base.TestCase):
self.assertRaises(exceptions.ComputeWaitTimeoutException,
computewait.execute,
_amphora_mock)
_amphora_mock, AMPHORA_ID)
@mock.patch('octavia.controller.worker.amphora_rate_limit'
'.AmphoraBuildRateLimit.remove_from_build_req_queue')
@mock.patch('stevedore.driver.DriverManager.driver')
@mock.patch('time.sleep')
def test_compute_wait_error_status(self, mock_time_sleep, mock_driver):
def test_compute_wait_error_status(self,
mock_time_sleep,
mock_driver,
mock_remove_from_build_queue):
self.conf.config(group='haproxy_amphora', build_rate_limit=5)
_amphora_mock.compute_id = COMPUTE_ID
_amphora_mock.status = constants.ACTIVE
_amphora_mock.lb_network_ip = LB_NET_IP
@ -354,7 +367,7 @@ class TestComputeTasks(base.TestCase):
mock_driver.get_amphora.return_value = _amphora_mock
computewait = compute_tasks.ComputeWait()
computewait.execute(COMPUTE_ID)
computewait.execute(COMPUTE_ID, AMPHORA_ID)
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID)
@ -362,7 +375,27 @@ class TestComputeTasks(base.TestCase):
self.assertRaises(exceptions.ComputeBuildException,
computewait.execute,
_amphora_mock)
_amphora_mock, AMPHORA_ID)
@mock.patch('octavia.controller.worker.amphora_rate_limit'
'.AmphoraBuildRateLimit.remove_from_build_req_queue')
@mock.patch('stevedore.driver.DriverManager.driver')
@mock.patch('time.sleep')
def test_compute_wait_skipped(self,
mock_time_sleep,
mock_driver,
mock_remove_from_build_queue):
_amphora_mock.compute_id = COMPUTE_ID
_amphora_mock.status = constants.ACTIVE
_amphora_mock.lb_network_ip = LB_NET_IP
mock_driver.get_amphora.return_value = _amphora_mock
computewait = compute_tasks.ComputeWait()
computewait.execute(COMPUTE_ID, AMPHORA_ID)
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID)
mock_remove_from_build_queue.assert_not_called()
@mock.patch('stevedore.driver.DriverManager.driver')
def test_delete_amphorae_on_load_balancer(self, mock_driver):
@ -397,6 +430,7 @@ class TestComputeTasks(base.TestCase):
mock_driver.create_server_group.assert_called_once_with(
'octavia-lb-123', 'anti-affinity')
# Make sure it returns the expected server group_id
self.assertEqual(server_group_test_id, sg_id)
# Test revert()

View File

@ -0,0 +1,129 @@
# Copyright 2016 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from octavia.controller.worker import amphora_rate_limit
import octavia.tests.unit.base as base
AMP_ID = uuidutils.generate_uuid()
BUILD_PRIORITY = 40
USED_BUILD_SLOTS = 0
class TestAmphoraBuildRateLimit(base.TestCase):
def setUp(self):
super(TestAmphoraBuildRateLimit, self).setUp()
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
self.rate_limit = amphora_rate_limit.AmphoraBuildRateLimit()
self.amp_build_slots_repo = mock.MagicMock()
self.amp_build_req_repo = mock.MagicMock()
self.conf.config(group='haproxy_amphora', build_rate_limit=1)
@mock.patch('octavia.db.api.get_session', mock.MagicMock())
@mock.patch('octavia.controller.worker.amphora_rate_limit'
'.AmphoraBuildRateLimit.wait_for_build_slot')
@mock.patch('octavia.db.repositories.AmphoraBuildReqRepository'
'.add_to_build_queue')
def test_add_to_build_request_queue(self,
mock_add_to_build_queue,
mock_wait_for_build_slot):
self.rate_limit.add_to_build_request_queue(AMP_ID, BUILD_PRIORITY)
mock_add_to_build_queue.assert_called_once()
mock_wait_for_build_slot.assert_called_once()
@mock.patch('octavia.db.api.get_session', mock.MagicMock())
@mock.patch('octavia.db.repositories.AmphoraBuildSlotsRepository'
'.get_used_build_slots_count',
return_value=USED_BUILD_SLOTS)
def test_has_build_slot(self, mock_get_used_build_slots_count):
result = self.rate_limit.has_build_slot()
mock_get_used_build_slots_count.assert_called_once()
self.assertTrue(result)
@mock.patch('octavia.db.api.get_session', mock.MagicMock())
@mock.patch('octavia.db.repositories.AmphoraBuildReqRepository'
'.get_highest_priority_build_req', return_value=AMP_ID)
def test_has_highest_priority(self, mock_get_highest_priority_build_req):
result = self.rate_limit.has_highest_priority(AMP_ID)
mock_get_highest_priority_build_req.assert_called_once()
self.assertTrue(result)
@mock.patch('octavia.db.api.get_session', mock.MagicMock())
@mock.patch('octavia.db.repositories.AmphoraBuildReqRepository'
'.update_req_status')
@mock.patch('octavia.db.repositories.AmphoraBuildSlotsRepository'
'.update_count')
def test_update_build_status_and_available_build_slots(self,
mock_update_count,
mock_update_status):
self.rate_limit.update_build_status_and_available_build_slots(AMP_ID)
mock_update_count.assert_called_once()
mock_update_status.assert_called_once()
@mock.patch('octavia.db.api.get_session', mock.MagicMock())
@mock.patch('octavia.db.repositories.AmphoraBuildReqRepository.delete')
@mock.patch('octavia.db.repositories.AmphoraBuildSlotsRepository'
'.update_count')
def test_remove_from_build_req_queue(self,
mock_update_count,
mock_delete):
self.rate_limit.remove_from_build_req_queue(AMP_ID)
mock_update_count.assert_called_once()
mock_delete.assert_called_once()
@mock.patch('octavia.db.api.get_session', mock.MagicMock())
@mock.patch('octavia.db.repositories.AmphoraBuildReqRepository'
'.delete_all')
@mock.patch('octavia.db.repositories.AmphoraBuildSlotsRepository'
'.update_count')
def test_remove_all_from_build_req_queue(self,
mock_update_count,
mock_delete_all):
self.rate_limit.remove_all_from_build_req_queue()
mock_update_count.assert_called_once()
mock_delete_all.assert_called_once()
@mock.patch('octavia.controller.worker.amphora_rate_limit'
'.AmphoraBuildRateLimit.has_build_slot', return_value=True)
@mock.patch('octavia.controller.worker.amphora_rate_limit'
'.AmphoraBuildRateLimit.has_highest_priority',
return_value=True)
@mock.patch('octavia.controller.worker.amphora_rate_limit'
'.AmphoraBuildRateLimit.'
'update_build_status_and_available_build_slots')
@mock.patch('octavia.controller.worker.amphora_rate_limit'
'.AmphoraBuildRateLimit.remove_all_from_build_req_queue')
@mock.patch('time.sleep')
def test_wait_for_build_slot(self,
mock_time_sleep,
mock_remove_all,
mock_update_status_and_slots_count,
mock_has_high_priority,
mock_has_build_slot):
self.rate_limit.wait_for_build_slot(AMP_ID)
self.assertTrue(mock_has_build_slot.called)
self.assertTrue(mock_has_high_priority.called)

View File

@ -133,7 +133,10 @@ class TestControllerWorker(base.TestCase):
amp = cw.create_amphora()
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with('TEST'))
assert_called_once_with(
'TEST',
store={constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_SPARES_POOL_PRIORITY}))
_flow_mock.run.assert_called_once_with()
@ -400,7 +403,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load.return_value = mock_eng
store = {
constants.LOADBALANCER_ID: LB_ID,
'update_dict': {'topology': constants.TOPOLOGY_SINGLE}
'update_dict': {'topology': constants.TOPOLOGY_SINGLE},
constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_NORMAL_PRIORITY
}
setattr(mock_lb_repo_get.return_value, 'listeners', [])
@ -441,7 +445,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load.return_value = mock_eng
store = {
constants.LOADBALANCER_ID: LB_ID,
'update_dict': {'topology': constants.TOPOLOGY_ACTIVE_STANDBY}
'update_dict': {'topology': constants.TOPOLOGY_ACTIVE_STANDBY},
constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_NORMAL_PRIORITY
}
setattr(mock_lb_repo_get.return_value, 'listeners', [])
@ -483,7 +488,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load.return_value = mock_eng
store = {
constants.LOADBALANCER_ID: LB_ID,
'update_dict': {'topology': constants.TOPOLOGY_SINGLE}
'update_dict': {'topology': constants.TOPOLOGY_SINGLE},
constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_NORMAL_PRIORITY
}
cw = controller_worker.ControllerWorker()
@ -532,7 +538,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load.return_value = mock_eng
store = {
constants.LOADBALANCER_ID: LB_ID,
'update_dict': {'topology': constants.TOPOLOGY_ACTIVE_STANDBY}
'update_dict': {'topology': constants.TOPOLOGY_ACTIVE_STANDBY},
constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_NORMAL_PRIORITY
}
cw = controller_worker.ControllerWorker()
@ -1081,7 +1088,10 @@ class TestControllerWorker(base.TestCase):
_flow_mock,
store={constants.FAILED_AMPHORA: _amphora_mock,
constants.LOADBALANCER_ID:
_amphora_mock.load_balancer_id}))
_amphora_mock.load_balancer_id,
constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_FAILOVER_PRIORITY
}))
_flow_mock.run.assert_called_once_with()