health manager service

This model is used to check amphora health
Add a column 'busy' and primary key  for data table amphora health
Add mutiprocessing code in cmd/health_manager, one for health check, the other is for UDP pacakge listening,

Co-Authored-By: Michael Johnson <johnsomor@gmail.com>
Co-Authored-By: min wang <swiftwangster@gmail.com>
Implements: blueprint health-manager
Change-Id: I8aeb6b82b58b59951a414e7c2e4c2c58c33a5d15
This commit is contained in:
minwang 2015-03-03 18:25:08 -08:00 committed by Carlos Garza
parent 2cfcf3eff1
commit f849f55e5e
14 changed files with 272 additions and 24 deletions

View File

@ -33,6 +33,11 @@
# to put the [database] section and its connection attribute in this
# configuration file.
[health_manager]
# failover_threads = 10
# interval = 3
# heartbeat_timeout = 10
[keystone_authtoken]
# auth_uri = https://localhost:5000/v3
# admin_user = octavia

View File

@ -12,16 +12,54 @@
# License for the specific language governing permissions and limitations
# under the License.
#
import multiprocessing
import sys
import time
from oslo_config import cfg
from oslo_log import log as logging
from octavia.common import service
from octavia.controller.healthmanager import health_manager
from octavia.i18n import _LI
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
CONF.import_group('health_manager', 'octavia.common.config')
def HM_listener():
while True:
time.sleep(5)
# to do by Carlos
def HM_health_check():
while True:
time.sleep(CONF.health_manager.interval)
hm = health_manager.HealthManager()
hm.health_check()
def main():
service.prepare_service(sys.argv)
print('Not implemented yet')
processes = []
HM_listener_proc = multiprocessing.Process(name='HM_listener',
target=HM_listener)
processes.append(HM_listener_proc)
HM_health_check_proc = multiprocessing.Process(name='HM_health_check',
target=HM_health_check)
processes.append(HM_health_check_proc)
LOG.info(_LI("Health Manager listener process starts:"))
HM_listener_proc.start()
LOG.info(_LI("Health manager check process starts:"))
HM_health_check_proc.start()
try:
for process in processes:
process.join()
except KeyboardInterrupt:
LOG.info(_LI("Health Manager existing due to signal"))
HM_listener_proc.terminate()
HM_health_check_proc.terminate()

View File

@ -84,6 +84,19 @@ networking_opts = [
cfg.StrOpt('lb_network_name', help=_('Name of amphora internal network')),
]
healthmanager_opts = [
cfg.IntOpt('failover_threads',
default=10,
help=_('Number of threads performing amphora failovers.')),
cfg.IntOpt('interval',
default=3,
help=_('Sleep time between health checks in seconds.')),
cfg.IntOpt('heartbeat_timeout',
default=10,
help=_('Interval, in seconds, to wait before failing over an '
'amphora.')),
]
oslo_messaging_opts = [
cfg.StrOpt('topic'),
]
@ -232,6 +245,7 @@ cfg.CONF.register_opts(oslo_messaging_opts, group='oslo_messaging')
cfg.CONF.register_opts(house_keeping_opts, group='house_keeping')
cfg.CONF.register_cli_opts(core_cli_opts)
cfg.CONF.register_opts(certificate_opts, group='certificates')
cfg.CONF.register_cli_opts(healthmanager_opts, group='health_manager')
cfg.CONF.import_group('keystone_authtoken', 'keystonemiddleware.auth_token')
cfg.CONF.register_opts(keystone_authtoken_v3_opts,
group='keystone_authtoken_v3')

View File

@ -241,6 +241,7 @@ class Amphora(BaseDataModel):
class AmphoraHealth(BaseDataModel):
def __init__(self, amphora_id=None, last_update=None):
def __init__(self, amphora_id=None, last_update=None, busy=False):
self.amphora_id = amphora_id
self.last_update = last_update
self.busy = busy

View File

@ -0,0 +1,59 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from concurrent import futures
from oslo_config import cfg
from oslo_log import log as logging
from octavia.controller.worker import controller_worker as cw
from octavia.db import api as db_api
from octavia.db import repositories as repo
from octavia.i18n import _LI
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
CONF.import_group('health_manager', 'octavia.common.config')
class HealthManager(object):
def __init__(self):
self.cw = cw.ControllerWorker()
self.threads = CONF.health_manager.failover_threads
def health_check(self):
amp_health_repo = repo.AmphoraHealthRepository()
with futures.ThreadPoolExecutor(max_workers=self.threads) as executor:
try:
while True:
time.sleep(CONF.health_manager.interval)
session = db_api.get_session()
LOG.debug("Starting amphora health check")
failover_count = 0
while True:
amp = amp_health_repo.get_stale_amphora(session)
if amp is None:
break
failover_count += 1
LOG.info(_LI("Stale amphora's id is: %s") %
amp.amphora_id)
executor.submit(self.cw.failover_amphora,
amp.amphora_id)
if failover_count > 0:
LOG.info(_LI("Failed over %s amphora") %
failover_count)
finally:
executor.shutdown(wait=True)

View File

@ -0,0 +1,32 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Add a column busy in table amphora health
Revision ID: 543f5d8e4e56
Revises: 2351ea316465
Create Date: 2015-07-27 11:32:16.685383
"""
# revision identifiers, used by Alembic.
revision = '543f5d8e4e56'
down_revision = '2351ea316465'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column(u'amphora_health',
sa.Column(u'busy', sa.Boolean(), nullable=False))

View File

@ -30,12 +30,9 @@ import sqlalchemy as sa
def upgrade():
op.create_table(
u'amphora_health',
sa.Column(u'amphora_id', sa.String(36), nullable=False),
sa.Column(u'amphora_id', sa.String(36), nullable=False,
primary_key=True),
sa.Column(u'last_update', sa.DateTime(timezone=True),
nullable=False)
)
def downgrade():
op.drop_table('health_manager')

View File

@ -358,3 +358,5 @@ class AmphoraHealth(base_models.BASE):
sa.String(36), nullable=False, primary_key=True)
last_update = sa.Column(sa.DateTime, default=func.now(),
nullable=False)
busy = sa.Column(sa.Boolean(), default=False, nullable=False)

View File

@ -19,12 +19,18 @@ reference
import datetime
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import uuidutils
from octavia.common import constants
from octavia.common import exceptions
from octavia.db import models
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
CONF.import_group('health_manager', 'octavia.common.config')
class BaseRepository(object):
model_class = None
@ -407,18 +413,26 @@ class AmphoraHealthRepository(BaseRepository):
amphora_health = self.get(session, amphora_id=amphora_id)
return amphora_health.last_update < timestamp
def get_expired_amphorae(self, session, timestamp=None):
"""Retrieves a list of entities from the health manager database.
def get_stale_amphora(self, session):
"""Retrieves a staled amphora from the health manager database.
:param session: A Sql Alchemy database session.
:param timestamp: A standard datetime which is used to see if an
amphora needs to be updated (default: now - 10s)
:returns: [octavia.common.data_model]
"""
if not timestamp:
timestamp = datetime.datetime.utcnow() - datetime.timedelta(
seconds=10)
filterquery = self.model_class.last_update < timestamp
model_list = session.query(self.model_class).filter(filterquery).all()
data_model_list = [model.to_data_model() for model in model_list]
return data_model_list
timestamp = CONF.health_manager.heartbeat_timeout
expired_time = datetime.datetime.utcnow() - datetime.timedelta(
seconds=timestamp)
with session.begin(subtransactions=True):
amp = session.query(self.model_class).with_for_update().filter_by(
busy=False).filter(
self.model_class.last_update < expired_time).first()
if amp is None:
return None
amp.busy = True
return amp.to_data_model()

View File

@ -129,7 +129,8 @@ class ModelTestMixin(object):
def create_amphora_health(self, session, **overrides):
kwargs = {'amphora_id': self.FAKE_UUID_1,
'last_update': datetime.date.today()}
'last_update': datetime.date.today(),
'busy': True}
kwargs.update(overrides)
return self._insert(session, models.AmphoraHealth, kwargs)

View File

@ -1166,7 +1166,8 @@ class AmphoraHealthRepositoryTest(BaseRepositoryTest):
amphora_health = self.amphora_health_repo.create(
self.session, amphora_id=amphora_id,
last_update=newdate)
last_update=newdate,
busy=False)
return amphora_health
def test_get(self):
@ -1193,11 +1194,11 @@ class AmphoraHealthRepositoryTest(BaseRepositoryTest):
self.session, self.amphora.id, exp_age)
self.assertTrue(checkres)
def test_get_expired_amphorae(self):
def test_get_stale_amphorae(self):
self.create_amphora_health(self.amphora.id)
amphora_list = self.amphora_health_repo.get_expired_amphorae(
stale_amphora = self.amphora_health_repo.get_stale_amphora(
self.session)
self.assertEqual(len(amphora_list), 1)
self.assertEqual(stale_amphora.amphora_id, self.amphora.id)
def test_create(self):
amphora_health = self.create_amphora_health(self.FAKE_UUID_1)

View File

@ -0,0 +1,84 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import uuidutils
import six
from octavia.controller.healthmanager import health_manager as healthmanager
import octavia.tests.unit.base as base
if six.PY2:
import mock
else:
import unittest.mock as mock
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
CONF.import_group('health_manager', 'octavia.common.config')
AMPHORA_ID = uuidutils.generate_uuid()
class TestException(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class TestHealthManager(base.TestCase):
def setUp(self):
super(TestHealthManager, self).setUp()
@mock.patch('octavia.controller.worker.controller_worker.'
'ControllerWorker.failover_amphora')
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.'
'get_stale_amphora')
@mock.patch('time.sleep')
@mock.patch('octavia.db.api.get_session')
def test_health_check_stale_amphora(self, session_mock,
sleep_mock, get_stale_amp_mock,
failover_mock):
amphora_health = mock.MagicMock()
amphora_health.amphora_id = AMPHORA_ID
session_mock.side_effect = [None, TestException('test')]
get_stale_amp_mock.side_effect = [amphora_health, None]
hm = healthmanager.HealthManager()
self.assertRaises(TestException, hm.health_check)
failover_mock.assert_called_once_with(AMPHORA_ID)
@mock.patch('octavia.controller.worker.controller_worker.'
'ControllerWorker.failover_amphora')
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.'
'get_stale_amphora', return_value=None)
@mock.patch('time.sleep')
@mock.patch('octavia.db.api.get_session')
def test_health_check_nonestale_amphora(self, session_mock,
sleep_mock, get_stale_amp_mock,
failover_mock):
session_mock.side_effect = [None, TestException('test')]
get_stale_amp_mock.return_value = None
hm = healthmanager.HealthManager()
self.assertRaises(TestException, hm.health_check)
self.assertFalse(failover_mock.called)