From a205ab3ebe4ae14668f25328c062f9383d6c69cb Mon Sep 17 00:00:00 2001
From: Michael Johnson <johnsomor@gmail.com>
Date: Mon, 11 Mar 2019 17:06:15 -0700
Subject: [PATCH] Limit spares pool to the spare_amphora_pool_size

This patch fixes a bug where the housekeeping controllers may launch
more spare amphora than the spare_amphora_pool_size setting.

Story: 2003094
Task 23186

Change-Id: I4c98b3442d2471662488184fa7e91ac64ec33279
---
 .../controller/housekeeping/house_keeping.py  | 52 +++++++++++++------
 .../6ffc710674ef_spares_pool_table.py         | 35 +++++++++++++
 octavia/db/models.py                          |  7 +++
 octavia/db/repositories.py                    | 15 ++++++
 .../tests/functional/db/test_repositories.py  |  2 +-
 .../housekeeping/test_house_keeping.py        | 15 ++++++
 6 files changed, 108 insertions(+), 18 deletions(-)
 create mode 100644 octavia/db/migration/alembic_migrations/versions/6ffc710674ef_spares_pool_table.py

diff --git a/octavia/controller/housekeeping/house_keeping.py b/octavia/controller/housekeeping/house_keeping.py
index 48bc475b65..3357403810 100644
--- a/octavia/controller/housekeeping/house_keeping.py
+++ b/octavia/controller/housekeeping/house_keeping.py
@@ -17,6 +17,7 @@ import datetime
 
 from oslo_config import cfg
 from oslo_log import log as logging
+from oslo_utils import timeutils
 from sqlalchemy.orm import exc as sqlalchemy_exceptions
 
 from octavia.common import constants
@@ -31,6 +32,7 @@ CONF = cfg.CONF
 class SpareAmphora(object):
     def __init__(self):
         self.amp_repo = repo.AmphoraRepository()
+        self.spares_repo = repo.SparesPoolRepository()
         self.cw = cw.ControllerWorker()
 
     def spare_check(self):
@@ -38,26 +40,42 @@ class SpareAmphora(object):
 
         If it's less than the requirement, starts new amphora.
         """
+        lock_session = db_api.get_session(autocommit=False)
         session = db_api.get_session()
-        conf_spare_cnt = CONF.house_keeping.spare_amphora_pool_size
-        curr_spare_cnt = self.amp_repo.get_spare_amphora_count(session)
-        LOG.debug("Required Spare Amphora count : %d", conf_spare_cnt)
-        LOG.debug("Current Spare Amphora count : %d", curr_spare_cnt)
-        diff_count = conf_spare_cnt - curr_spare_cnt
+        try:
+            # Lock the spares_pool record for read and write
+            spare_amp_row = self.spares_repo.get_for_update(lock_session)
 
-        # When the current spare amphora is less than required
-        if diff_count > 0:
-            LOG.info("Initiating creation of %d spare amphora.", diff_count)
+            conf_spare_cnt = CONF.house_keeping.spare_amphora_pool_size
+            curr_spare_cnt = self.amp_repo.get_spare_amphora_count(session)
+            LOG.debug("Required Spare Amphora count : %d", conf_spare_cnt)
+            LOG.debug("Current Spare Amphora count : %d", curr_spare_cnt)
+            diff_count = conf_spare_cnt - curr_spare_cnt
 
-            # Call Amphora Create Flow diff_count times
-            with futures.ThreadPoolExecutor(
-                    max_workers=CONF.house_keeping.spare_amphora_pool_size
-            ) as executor:
-                for i in range(1, diff_count + 1):
-                    LOG.debug("Starting amphorae number %d ...", i)
-                    executor.submit(self.cw.create_amphora)
-        else:
-            LOG.debug("Current spare amphora count satisfies the requirement")
+            # When the current spare amphora is less than required
+            amp_booting = []
+            if diff_count > 0:
+                LOG.info("Initiating creation of %d spare amphora.",
+                         diff_count)
+
+                # Call Amphora Create Flow diff_count times
+                with futures.ThreadPoolExecutor(
+                        max_workers=CONF.house_keeping.spare_amphora_pool_size
+                ) as executor:
+                    for i in range(1, diff_count + 1):
+                        LOG.debug("Starting amphorae number %d ...", i)
+                        amp_booting.append(
+                            executor.submit(self.cw.create_amphora))
+            else:
+                LOG.debug("Current spare amphora count satisfies the "
+                          "requirement")
+
+            # Wait for the amphora boot threads to finish
+            futures.wait(amp_booting)
+            spare_amp_row.updated_at = timeutils.utcnow()
+            lock_session.commit()
+        except Exception:
+            lock_session.rollback()
 
 
 class DatabaseCleanup(object):
diff --git a/octavia/db/migration/alembic_migrations/versions/6ffc710674ef_spares_pool_table.py b/octavia/db/migration/alembic_migrations/versions/6ffc710674ef_spares_pool_table.py
new file mode 100644
index 0000000000..ff42e65497
--- /dev/null
+++ b/octavia/db/migration/alembic_migrations/versions/6ffc710674ef_spares_pool_table.py
@@ -0,0 +1,35 @@
+#    Copyright 2019 Michael Johnson
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""Spares pool table
+
+Revision ID: 6ffc710674ef
+Revises: 7432f1d4ea83
+Create Date: 2019-03-11 10:45:43.296236
+
+"""
+
+from alembic import op
+import sqlalchemy as sa
+
+# revision identifiers, used by Alembic.
+revision = '6ffc710674ef'
+down_revision = '7432f1d4ea83'
+
+
+def upgrade():
+    op.create_table(
+        u'spares_pool',
+        sa.Column(u'updated_at', sa.DateTime(), nullable=True,
+                  server_default=sa.func.current_timestamp()))
diff --git a/octavia/db/models.py b/octavia/db/models.py
index 46ba1b1d4e..7d2fe54f98 100644
--- a/octavia/db/models.py
+++ b/octavia/db/models.py
@@ -782,3 +782,10 @@ class ClientAuthenticationMode(base_models.BASE):
     __tablename__ = "client_authentication_mode"
 
     name = sa.Column(sa.String(10), primary_key=True, nullable=False)
+
+
+class SparesPool(base_models.BASE):
+
+    __tablename__ = "spares_pool"
+
+    updated_at = sa.Column(sa.DateTime, primary_key=True, nullable=True)
diff --git a/octavia/db/repositories.py b/octavia/db/repositories.py
index 28ea5e643c..67fdd9b8a5 100644
--- a/octavia/db/repositories.py
+++ b/octavia/db/repositories.py
@@ -201,6 +201,7 @@ class Repositories(object):
         self.quotas = QuotasRepository()
         self.flavor = FlavorRepository()
         self.flavor_profile = FlavorProfileRepository()
+        self.spares_pool = SparesPoolRepository()
 
     def create_load_balancer_and_vip(self, session, lb_dict, vip_dict):
         """Inserts load balancer and vip entities into the database.
@@ -1811,3 +1812,17 @@ class FlavorRepository(BaseRepository):
 
 class FlavorProfileRepository(BaseRepository):
     model_class = models.FlavorProfile
+
+
+class SparesPoolRepository(BaseRepository):
+    model_class = models.SparesPool
+
+    def get_for_update(self, lock_session):
+        """Queries and locks the SparesPool record.
+
+        This call will query for the SparesPool table record and lock it
+        so that other processes cannot read or write it.
+        :returns: expected_spares_count, updated_at
+        """
+        row = lock_session.query(models.SparesPool).with_for_update().one()
+        return row
diff --git a/octavia/tests/functional/db/test_repositories.py b/octavia/tests/functional/db/test_repositories.py
index 9059a40c3a..975818052c 100644
--- a/octavia/tests/functional/db/test_repositories.py
+++ b/octavia/tests/functional/db/test_repositories.py
@@ -119,7 +119,7 @@ class AllRepositoriesTest(base.OctaviaDBTestBase):
                            'listener_stats', 'amphora', 'sni',
                            'amphorahealth', 'vrrpgroup', 'l7rule', 'l7policy',
                            'amp_build_slots', 'amp_build_req', 'quotas',
-                           'flavor', 'flavor_profile')
+                           'flavor', 'flavor_profile', 'spares_pool')
         for repo_attr in repo_attr_names:
             single_repo = getattr(self.repos, repo_attr, None)
             message = ("Class Repositories should have %s instance"
diff --git a/octavia/tests/unit/controller/housekeeping/test_house_keeping.py b/octavia/tests/unit/controller/housekeeping/test_house_keeping.py
index 402e5037f1..b1eb05610f 100644
--- a/octavia/tests/unit/controller/housekeeping/test_house_keeping.py
+++ b/octavia/tests/unit/controller/housekeeping/test_house_keeping.py
@@ -83,6 +83,21 @@ class TestSpareCheck(base.TestCase):
         self.assertEqual(0, DIFF_CNT)
         self.assertEqual(DIFF_CNT, self.cw.create_amphora.call_count)
 
+    @mock.patch('octavia.db.repositories.SparesPoolRepository.get_for_update')
+    @mock.patch('octavia.db.api.get_session')
+    def test_spare_check_rollback(self, mock_session, mock_update):
+        """When spare amphora count meets the requirement."""
+        lock_session = mock.MagicMock()
+        session = mock.MagicMock()
+        mock_session.side_effect = [lock_session, session]
+        mock_update.side_effect = [Exception('boom')]
+#        self.CONF.config(group="house_keeping",
+#                         spare_amphora_pool_size=self.FAKE_CNF_SPAR2)
+#        self.amp_repo.get_spare_amphora_count.return_value = (
+#            self.FAKE_CUR_SPAR2)
+        self.spare_amp.spare_check()
+        lock_session.rollback.assert_called_once()
+
 
 class TestDatabaseCleanup(base.TestCase):
     FAKE_IP = "10.0.0.1"