Limit spares pool to the spare_amphora_pool_size
This patch fixes a bug where the housekeeping controllers may launch more spare amphora than the spare_amphora_pool_size setting. Story: 2003094 Task 23186 Change-Id: I4c98b3442d2471662488184fa7e91ac64ec33279
This commit is contained in:
parent
5b3b0c6b16
commit
a205ab3ebe
octavia
controller/housekeeping
db
tests
@ -17,6 +17,7 @@ import datetime
|
|||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
from oslo_utils import timeutils
|
||||||
from sqlalchemy.orm import exc as sqlalchemy_exceptions
|
from sqlalchemy.orm import exc as sqlalchemy_exceptions
|
||||||
|
|
||||||
from octavia.common import constants
|
from octavia.common import constants
|
||||||
@ -31,6 +32,7 @@ CONF = cfg.CONF
|
|||||||
class SpareAmphora(object):
|
class SpareAmphora(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.amp_repo = repo.AmphoraRepository()
|
self.amp_repo = repo.AmphoraRepository()
|
||||||
|
self.spares_repo = repo.SparesPoolRepository()
|
||||||
self.cw = cw.ControllerWorker()
|
self.cw = cw.ControllerWorker()
|
||||||
|
|
||||||
def spare_check(self):
|
def spare_check(self):
|
||||||
@ -38,26 +40,42 @@ class SpareAmphora(object):
|
|||||||
|
|
||||||
If it's less than the requirement, starts new amphora.
|
If it's less than the requirement, starts new amphora.
|
||||||
"""
|
"""
|
||||||
|
lock_session = db_api.get_session(autocommit=False)
|
||||||
session = db_api.get_session()
|
session = db_api.get_session()
|
||||||
conf_spare_cnt = CONF.house_keeping.spare_amphora_pool_size
|
try:
|
||||||
curr_spare_cnt = self.amp_repo.get_spare_amphora_count(session)
|
# Lock the spares_pool record for read and write
|
||||||
LOG.debug("Required Spare Amphora count : %d", conf_spare_cnt)
|
spare_amp_row = self.spares_repo.get_for_update(lock_session)
|
||||||
LOG.debug("Current Spare Amphora count : %d", curr_spare_cnt)
|
|
||||||
diff_count = conf_spare_cnt - curr_spare_cnt
|
|
||||||
|
|
||||||
# When the current spare amphora is less than required
|
conf_spare_cnt = CONF.house_keeping.spare_amphora_pool_size
|
||||||
if diff_count > 0:
|
curr_spare_cnt = self.amp_repo.get_spare_amphora_count(session)
|
||||||
LOG.info("Initiating creation of %d spare amphora.", diff_count)
|
LOG.debug("Required Spare Amphora count : %d", conf_spare_cnt)
|
||||||
|
LOG.debug("Current Spare Amphora count : %d", curr_spare_cnt)
|
||||||
|
diff_count = conf_spare_cnt - curr_spare_cnt
|
||||||
|
|
||||||
# Call Amphora Create Flow diff_count times
|
# When the current spare amphora is less than required
|
||||||
with futures.ThreadPoolExecutor(
|
amp_booting = []
|
||||||
max_workers=CONF.house_keeping.spare_amphora_pool_size
|
if diff_count > 0:
|
||||||
) as executor:
|
LOG.info("Initiating creation of %d spare amphora.",
|
||||||
for i in range(1, diff_count + 1):
|
diff_count)
|
||||||
LOG.debug("Starting amphorae number %d ...", i)
|
|
||||||
executor.submit(self.cw.create_amphora)
|
# Call Amphora Create Flow diff_count times
|
||||||
else:
|
with futures.ThreadPoolExecutor(
|
||||||
LOG.debug("Current spare amphora count satisfies the requirement")
|
max_workers=CONF.house_keeping.spare_amphora_pool_size
|
||||||
|
) as executor:
|
||||||
|
for i in range(1, diff_count + 1):
|
||||||
|
LOG.debug("Starting amphorae number %d ...", i)
|
||||||
|
amp_booting.append(
|
||||||
|
executor.submit(self.cw.create_amphora))
|
||||||
|
else:
|
||||||
|
LOG.debug("Current spare amphora count satisfies the "
|
||||||
|
"requirement")
|
||||||
|
|
||||||
|
# Wait for the amphora boot threads to finish
|
||||||
|
futures.wait(amp_booting)
|
||||||
|
spare_amp_row.updated_at = timeutils.utcnow()
|
||||||
|
lock_session.commit()
|
||||||
|
except Exception:
|
||||||
|
lock_session.rollback()
|
||||||
|
|
||||||
|
|
||||||
class DatabaseCleanup(object):
|
class DatabaseCleanup(object):
|
||||||
|
@ -0,0 +1,35 @@
|
|||||||
|
# Copyright 2019 Michael Johnson
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""Spares pool table
|
||||||
|
|
||||||
|
Revision ID: 6ffc710674ef
|
||||||
|
Revises: 7432f1d4ea83
|
||||||
|
Create Date: 2019-03-11 10:45:43.296236
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '6ffc710674ef'
|
||||||
|
down_revision = '7432f1d4ea83'
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
op.create_table(
|
||||||
|
u'spares_pool',
|
||||||
|
sa.Column(u'updated_at', sa.DateTime(), nullable=True,
|
||||||
|
server_default=sa.func.current_timestamp()))
|
@ -782,3 +782,10 @@ class ClientAuthenticationMode(base_models.BASE):
|
|||||||
__tablename__ = "client_authentication_mode"
|
__tablename__ = "client_authentication_mode"
|
||||||
|
|
||||||
name = sa.Column(sa.String(10), primary_key=True, nullable=False)
|
name = sa.Column(sa.String(10), primary_key=True, nullable=False)
|
||||||
|
|
||||||
|
|
||||||
|
class SparesPool(base_models.BASE):
|
||||||
|
|
||||||
|
__tablename__ = "spares_pool"
|
||||||
|
|
||||||
|
updated_at = sa.Column(sa.DateTime, primary_key=True, nullable=True)
|
||||||
|
@ -201,6 +201,7 @@ class Repositories(object):
|
|||||||
self.quotas = QuotasRepository()
|
self.quotas = QuotasRepository()
|
||||||
self.flavor = FlavorRepository()
|
self.flavor = FlavorRepository()
|
||||||
self.flavor_profile = FlavorProfileRepository()
|
self.flavor_profile = FlavorProfileRepository()
|
||||||
|
self.spares_pool = SparesPoolRepository()
|
||||||
|
|
||||||
def create_load_balancer_and_vip(self, session, lb_dict, vip_dict):
|
def create_load_balancer_and_vip(self, session, lb_dict, vip_dict):
|
||||||
"""Inserts load balancer and vip entities into the database.
|
"""Inserts load balancer and vip entities into the database.
|
||||||
@ -1811,3 +1812,17 @@ class FlavorRepository(BaseRepository):
|
|||||||
|
|
||||||
class FlavorProfileRepository(BaseRepository):
|
class FlavorProfileRepository(BaseRepository):
|
||||||
model_class = models.FlavorProfile
|
model_class = models.FlavorProfile
|
||||||
|
|
||||||
|
|
||||||
|
class SparesPoolRepository(BaseRepository):
|
||||||
|
model_class = models.SparesPool
|
||||||
|
|
||||||
|
def get_for_update(self, lock_session):
|
||||||
|
"""Queries and locks the SparesPool record.
|
||||||
|
|
||||||
|
This call will query for the SparesPool table record and lock it
|
||||||
|
so that other processes cannot read or write it.
|
||||||
|
:returns: expected_spares_count, updated_at
|
||||||
|
"""
|
||||||
|
row = lock_session.query(models.SparesPool).with_for_update().one()
|
||||||
|
return row
|
||||||
|
@ -119,7 +119,7 @@ class AllRepositoriesTest(base.OctaviaDBTestBase):
|
|||||||
'listener_stats', 'amphora', 'sni',
|
'listener_stats', 'amphora', 'sni',
|
||||||
'amphorahealth', 'vrrpgroup', 'l7rule', 'l7policy',
|
'amphorahealth', 'vrrpgroup', 'l7rule', 'l7policy',
|
||||||
'amp_build_slots', 'amp_build_req', 'quotas',
|
'amp_build_slots', 'amp_build_req', 'quotas',
|
||||||
'flavor', 'flavor_profile')
|
'flavor', 'flavor_profile', 'spares_pool')
|
||||||
for repo_attr in repo_attr_names:
|
for repo_attr in repo_attr_names:
|
||||||
single_repo = getattr(self.repos, repo_attr, None)
|
single_repo = getattr(self.repos, repo_attr, None)
|
||||||
message = ("Class Repositories should have %s instance"
|
message = ("Class Repositories should have %s instance"
|
||||||
|
@ -83,6 +83,21 @@ class TestSpareCheck(base.TestCase):
|
|||||||
self.assertEqual(0, DIFF_CNT)
|
self.assertEqual(0, DIFF_CNT)
|
||||||
self.assertEqual(DIFF_CNT, self.cw.create_amphora.call_count)
|
self.assertEqual(DIFF_CNT, self.cw.create_amphora.call_count)
|
||||||
|
|
||||||
|
@mock.patch('octavia.db.repositories.SparesPoolRepository.get_for_update')
|
||||||
|
@mock.patch('octavia.db.api.get_session')
|
||||||
|
def test_spare_check_rollback(self, mock_session, mock_update):
|
||||||
|
"""When spare amphora count meets the requirement."""
|
||||||
|
lock_session = mock.MagicMock()
|
||||||
|
session = mock.MagicMock()
|
||||||
|
mock_session.side_effect = [lock_session, session]
|
||||||
|
mock_update.side_effect = [Exception('boom')]
|
||||||
|
# self.CONF.config(group="house_keeping",
|
||||||
|
# spare_amphora_pool_size=self.FAKE_CNF_SPAR2)
|
||||||
|
# self.amp_repo.get_spare_amphora_count.return_value = (
|
||||||
|
# self.FAKE_CUR_SPAR2)
|
||||||
|
self.spare_amp.spare_check()
|
||||||
|
lock_session.rollback.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
class TestDatabaseCleanup(base.TestCase):
|
class TestDatabaseCleanup(base.TestCase):
|
||||||
FAKE_IP = "10.0.0.1"
|
FAKE_IP = "10.0.0.1"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user