Merge "Create database models for healthmanager"

This commit is contained in:
Jenkins 2015-03-26 21:59:49 +00:00 committed by Gerrit Code Review
commit b435b50e4c
11 changed files with 438 additions and 17 deletions

View File

@ -200,3 +200,10 @@ class Amphora(BaseDataModel):
self.status = status self.status = status
self.lb_network_ip = lb_network_ip self.lb_network_ip = lb_network_ip
self.load_balancer = load_balancer self.load_balancer = load_balancer
class AmphoraHealth(BaseDataModel):
def __init__(self, amphora_id=None, last_update=None):
self.amphora_id = amphora_id
self.last_update = last_update

View File

@ -0,0 +1,118 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy
from sqlalchemy.sql import func
from octavia.amphorae.drivers import driver_base as driver_base
from octavia.common import constants
from octavia.db import api as db_api
from octavia.db import repositories as repo
from octavia.openstack.common import log as logging
import six
LOG = logging.getLogger(__name__)
class UpdateHealthMixin(driver_base.HealthMixin):
def __init__(self):
super(UpdateHealthMixin, self).__init__()
# first setup repo for amphora, listener,member(nodes),pool repo
self.amphora_health_repo = repo.AmphoraHealthRepository()
self.listener_repo = repo.ListenerRepository()
self.member_repo = repo.MemberRepository()
self.pool_repo = repo.PoolRepository()
def update_health(self, health):
"""This function is to update db info based on amphora status
:param health: map object that contains amphora, listener, member info
:type map: string
:returns: null
This function has the following 3 goals:
1)Update the health_manager table based on amphora status is up/down
2)Update related DB status to be ERROR/DOWN when amphora is down
3)Update related DB status to be ACTIVATE/UP when amphora is up
4)Track the status of the members
The input health data structure is shown as below:
health = {
"amphora-status": "AMPHORA_UP",
"amphora-id": FAKE_UUID_1,
"listeners": {
"listener-id-1": {"listener-status": "ONLINE",
"members": {
"member-id-1": "ONLINE",
"member-id-2": "ONLINE"
}
},
"listener-id-2": {"listener-status": "ONLINE",
"members": {
"member-id-3": "ERROR",
"member-id-4": "ERROR",
"member-id-5": "ONLINE"
}
}
}
}
"""
session = db_api.get_session()
# if the input amphora is healthy, we update its db info
# before update db, we need to check if the db has been created,
# if not, we need to create first
if health["amphora-status"] == constants.AMPHORA_UP:
amphora_id = health["amphora-id"]
amphora = self.amphora_health_repo.get(
session, amphora_id=amphora_id)
if amphora is None:
self.amphora_health_repo.create(session,
amphora_id=amphora_id,
last_update=func.now())
else:
self.amphora_health_repo.update(session, amphora_id,
last_update=func.now())
# update listener and nodes db information
listeners = health['listeners']
for listener_id, listener in six.iteritems(listeners):
if listener.get("listener-status") == constants.ONLINE:
try:
self.listener_repo.update(
session, listener_id,
operating_status=constants.ONLINE)
except sqlalchemy.orm.exc.NoResultFound:
LOG.debug("Listener %s is not in DB", listener_id)
elif listener.get("listener-status") == constants.ERROR:
try:
self.listener_repo.update(session, listener_id,
operating_status=constants.ERROR)
except sqlalchemy.orm.exc.NoResultFound:
LOG.debug("Listener %s is not in DB", listener_id)
members = listener['members']
for member_id, member in six.iteritems(members):
if member in constants.SUPPORTED_OPERATING_STATUSES:
try:
self.member_repo.update(
session, id=member_id,
operating_status=member)
except sqlalchemy.orm.exc.NoResultFound:
LOG.DEBUG("Member %s is not able to update in DB",
member_id)

View File

@ -0,0 +1,41 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""create healthmanager table
Revision ID: 92fe9857279
Revises: 256852d5ff7c
Create Date: 2015-01-22 16:58:23.440247
"""
# revision identifiers, used by Alembic.
revision = '92fe9857279'
down_revision = '256852d5ff7c'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
u'amphora_health',
sa.Column(u'amphora_id', sa.String(36), nullable=False),
sa.Column(u'last_update', sa.DateTime(timezone=True),
nullable=False)
)
def downgrade():
op.drop_table('health_manager')

View File

@ -12,9 +12,11 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy import orm from sqlalchemy import orm
from sqlalchemy.orm import validates from sqlalchemy.orm import validates
from sqlalchemy.sql import func
from octavia.common import data_models from octavia.common import data_models
from octavia.db import base_models from octavia.db import base_models
@ -323,3 +325,13 @@ class Amphora(base_models.BASE):
sa.String(36), sa.String(36),
sa.ForeignKey("provisioning_status.name", sa.ForeignKey("provisioning_status.name",
name="fk_container_provisioning_status_name")) name="fk_container_provisioning_status_name"))
class AmphoraHealth(base_models.BASE):
__data_model__ = data_models.AmphoraHealth
__tablename__ = "amphora_health"
amphora_id = sa.Column(
sa.String(36), nullable=False, primary_key=True)
last_update = sa.Column(sa.DateTime, default=func.now(),
nullable=False)

View File

@ -22,9 +22,10 @@ from octavia.common import exceptions
from octavia.db import models from octavia.db import models
from octavia.openstack.common import uuidutils from octavia.openstack.common import uuidutils
import datetime
class BaseRepository(object): class BaseRepository(object):
model_class = None model_class = None
def create(self, session, **model_kwargs): def create(self, session, **model_kwargs):
@ -101,7 +102,6 @@ class BaseRepository(object):
class Repositories(object): class Repositories(object):
def __init__(self): def __init__(self):
self.load_balancer = LoadBalancerRepository() self.load_balancer = LoadBalancerRepository()
self.vip = VipRepository() self.vip = VipRepository()
@ -113,6 +113,7 @@ class Repositories(object):
self.listener_stats = ListenerStatisticsRepository() self.listener_stats = ListenerStatisticsRepository()
self.amphora = AmphoraRepository() self.amphora = AmphoraRepository()
self.sni = SNIRepository() self.sni = SNIRepository()
self.amphorahealth = AmphoraHealthRepository()
def create_load_balancer_and_vip(self, session, load_balancer_dict, def create_load_balancer_and_vip(self, session, load_balancer_dict,
vip_dict): vip_dict):
@ -203,7 +204,6 @@ class Repositories(object):
class LoadBalancerRepository(BaseRepository): class LoadBalancerRepository(BaseRepository):
model_class = models.LoadBalancer model_class = models.LoadBalancer
def test_and_set_provisioning_status(self, session, id, status): def test_and_set_provisioning_status(self, session, id, status):
@ -230,7 +230,6 @@ class LoadBalancerRepository(BaseRepository):
class VipRepository(BaseRepository): class VipRepository(BaseRepository):
model_class = models.Vip model_class = models.Vip
def update(self, session, load_balancer_id, **model_kwargs): def update(self, session, load_balancer_id, **model_kwargs):
@ -241,7 +240,6 @@ class VipRepository(BaseRepository):
class HealthMonitorRepository(BaseRepository): class HealthMonitorRepository(BaseRepository):
model_class = models.HealthMonitor model_class = models.HealthMonitor
def update(self, session, pool_id, **model_kwargs): def update(self, session, pool_id, **model_kwargs):
@ -252,7 +250,6 @@ class HealthMonitorRepository(BaseRepository):
class SessionPersistenceRepository(BaseRepository): class SessionPersistenceRepository(BaseRepository):
model_class = models.SessionPersistence model_class = models.SessionPersistence
def update(self, session, pool_id, **model_kwargs): def update(self, session, pool_id, **model_kwargs):
@ -268,12 +265,10 @@ class SessionPersistenceRepository(BaseRepository):
class PoolRepository(BaseRepository): class PoolRepository(BaseRepository):
model_class = models.Pool model_class = models.Pool
class MemberRepository(BaseRepository): class MemberRepository(BaseRepository):
model_class = models.Member model_class = models.Member
def delete_members(self, session, member_ids): def delete_members(self, session, member_ids):
@ -282,7 +277,6 @@ class MemberRepository(BaseRepository):
class ListenerRepository(BaseRepository): class ListenerRepository(BaseRepository):
model_class = models.Listener model_class = models.Listener
def has_pool(self, session, id): def has_pool(self, session, id):
@ -292,7 +286,6 @@ class ListenerRepository(BaseRepository):
class ListenerStatisticsRepository(BaseRepository): class ListenerStatisticsRepository(BaseRepository):
model_class = models.ListenerStatistics model_class = models.ListenerStatistics
def update(self, session, listener_id, **model_kwargs): def update(self, session, listener_id, **model_kwargs):
@ -303,7 +296,6 @@ class ListenerStatisticsRepository(BaseRepository):
class AmphoraRepository(BaseRepository): class AmphoraRepository(BaseRepository):
model_class = models.Amphora model_class = models.Amphora
def associate(self, session, load_balancer_id, amphora_id): def associate(self, session, load_balancer_id, amphora_id):
@ -317,7 +309,6 @@ class AmphoraRepository(BaseRepository):
class SNIRepository(BaseRepository): class SNIRepository(BaseRepository):
model_class = models.SNI model_class = models.SNI
def update(self, session, listener_id=None, tls_container_id=None, def update(self, session, listener_id=None, tls_container_id=None,
@ -332,3 +323,44 @@ class SNIRepository(BaseRepository):
elif tls_container_id: elif tls_container_id:
session.query(self.model_class).filter_by( session.query(self.model_class).filter_by(
tls_container_id=tls_container_id).update(model_kwargs) tls_container_id=tls_container_id).update(model_kwargs)
class AmphoraHealthRepository(BaseRepository):
model_class = models.AmphoraHealth
def update(self, session, amphora_id, **model_kwargs):
"""Updates a healthmanager entity in the database by amphora_id."""
with session.begin(subtransactions=True):
session.query(self.model_class).filter_by(
amphora_id=amphora_id).update(model_kwargs)
def check_amphora_expired(self, session, amphora_id, timestamp=None):
"""check if a specific amphora is expired
:param session: A Sql Alchemy database session.
:param amphora_id: id of an amphora object
:param timestamp: A standard datetime which is used to see if an
amphora needs to be updated (default: now - 10s)
:returns: boolean
"""
if not timestamp:
timestamp = datetime.datetime.utcnow() - datetime.timedelta(
seconds=10)
amphora_health = self.get(session, amphora_id=amphora_id)
return amphora_health.last_update < timestamp
def get_expired_amphorae(self, session, timestamp=None):
"""Retrieves a list of entities from the health manager database.
:param session: A Sql Alchemy database session.
:param timestamp: A standard datetime which is used to see if an
amphora needs to be updated (default: now - 10s)
:returns: [octavia.common.data_model]
"""
if not timestamp:
timestamp = datetime.datetime.utcnow() - datetime.timedelta(
seconds=10)
filterquery = self.model_class.last_update < timestamp
model_list = session.query(self.model_class).filter(filterquery).all()
data_model_list = [model.to_data_model() for model in model_list]
return data_model_list

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import datetime
from octavia.common import constants from octavia.common import constants
from octavia.common import data_models from octavia.common import data_models
from octavia.db import models from octavia.db import models
@ -120,6 +122,12 @@ class ModelTestMixin(object):
kwargs.update(overrides) kwargs.update(overrides)
return self._insert(session, models.Amphora, kwargs) return self._insert(session, models.Amphora, kwargs)
def create_amphora_health(self, session, **overrides):
kwargs = {'amphora_id': self.FAKE_UUID_1,
'last_update': datetime.date.today()}
kwargs.update(overrides)
return self._insert(session, models.AmphoraHealth, kwargs)
class PoolModelTest(base.OctaviaDBTestBase, ModelTestMixin): class PoolModelTest(base.OctaviaDBTestBase, ModelTestMixin):
@ -538,6 +546,36 @@ class AmphoraModelTest(base.OctaviaDBTestBase, ModelTestMixin):
self.assertIsInstance(new_amphora.load_balancer, models.LoadBalancer) self.assertIsInstance(new_amphora.load_balancer, models.LoadBalancer)
class AmphoraHealthModelTest(base.OctaviaDBTestBase, ModelTestMixin):
def setUp(self):
super(AmphoraHealthModelTest, self).setUp()
self.amphora = self.create_amphora(self.session)
def test_create(self):
self.create_amphora_health(self.session)
def test_update(self):
amphora_health = self.create_amphora_health(self.session)
d = datetime.date.today()
newdate = d.replace(day=d.day)
amphora_health.last_update = newdate
new_amphora_health = self.session.query(
models.AmphoraHealth).filter_by(
amphora_id=amphora_health.amphora_id).first()
self.assertEqual(newdate, new_amphora_health.last_update.date())
def test_delete(self):
amphora_health = self.create_amphora_health(
self.session)
with self.session.begin():
self.session.delete(amphora_health)
self.session.flush()
new_amphora_health = self.session.query(
models.AmphoraHealth).filter_by(
amphora_id=amphora_health.amphora_id).first()
self.assertIsNone(new_amphora_health)
class DataModelConversionTest(base.OctaviaDBTestBase, ModelTestMixin): class DataModelConversionTest(base.OctaviaDBTestBase, ModelTestMixin):
def setUp(self): def setUp(self):

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import datetime
from octavia.common import constants from octavia.common import constants
from octavia.common import data_models as models from octavia.common import data_models as models
from octavia.db import repositories as repo from octavia.db import repositories as repo
@ -38,6 +40,7 @@ class BaseRepositoryTest(base.OctaviaDBTestBase):
self.hm_repo = repo.HealthMonitorRepository() self.hm_repo = repo.HealthMonitorRepository()
self.sni_repo = repo.SNIRepository() self.sni_repo = repo.SNIRepository()
self.amphora_repo = repo.AmphoraRepository() self.amphora_repo = repo.AmphoraRepository()
self.amphora_health_repo = repo.AmphoraHealthRepository()
def test_get_all_return_value(self): def test_get_all_return_value(self):
pool_list = self.pool_repo.get_all(self.session, pool_list = self.pool_repo.get_all(self.session,
@ -67,7 +70,8 @@ class AllRepositoriesTest(base.OctaviaDBTestBase):
def test_all_repos_has_correct_repos(self): def test_all_repos_has_correct_repos(self):
repo_attr_names = ('load_balancer', 'vip', 'health_monitor', repo_attr_names = ('load_balancer', 'vip', 'health_monitor',
'session_persistence', 'pool', 'member', 'listener', 'session_persistence', 'pool', 'member', 'listener',
'listener_stats', 'amphora', 'sni') 'listener_stats', 'amphora', 'sni',
'amphorahealth')
for repo_attr in repo_attr_names: for repo_attr in repo_attr_names:
single_repo = getattr(self.repos, repo_attr, None) single_repo = getattr(self.repos, repo_attr, None)
message = ("Class Repositories should have %s instance" message = ("Class Repositories should have %s instance"
@ -1103,3 +1107,67 @@ class AmphoraRepositoryTest(BaseRepositoryTest):
self.assertIsNone(self.amphora_repo.get(self.session, id=amphora.id)) self.assertIsNone(self.amphora_repo.get(self.session, id=amphora.id))
new_lb = self.lb_repo.get(self.session, id=self.lb.id) new_lb = self.lb_repo.get(self.session, id=self.lb.id)
self.assertEqual(0, len(new_lb.amphorae)) self.assertEqual(0, len(new_lb.amphorae))
class AmphoraHealthRepositoryTest(BaseRepositoryTest):
def setUp(self):
super(AmphoraHealthRepositoryTest, self).setUp()
self.amphora = self.amphora_repo.create(self.session,
id=self.FAKE_UUID_1,
compute_id=self.FAKE_UUID_3,
status=constants.ACTIVE,
lb_network_ip=self.FAKE_IP)
def create_amphora_health(self, amphora_id):
newdate = datetime.datetime.utcnow() - datetime.timedelta(minutes=10)
amphora_health = self.amphora_health_repo.create(
self.session, amphora_id=amphora_id,
last_update=newdate)
return amphora_health
def test_get(self):
amphora_health = self.create_amphora_health(self.amphora.id)
new_amphora_health = self.amphora_health_repo.get(
self.session, amphora_id=amphora_health.amphora_id)
self.assertIsInstance(new_amphora_health, models.AmphoraHealth)
self.assertEqual(amphora_health, new_amphora_health)
def test_check_amphora_out_of_date(self):
self.create_amphora_health(self.amphora.id)
checkres = self.amphora_health_repo.check_amphora_expired(
self.session, self.amphora.id)
self.assertTrue(checkres)
def test_get_expired_amphorae(self):
self.create_amphora_health(self.amphora.id)
amphora_list = self.amphora_health_repo.get_expired_amphorae(
self.session)
self.assertEqual(len(amphora_list), 1)
def test_create(self):
amphora_health = self.create_amphora_health(self.FAKE_UUID_1)
self.assertEqual(self.FAKE_UUID_1, amphora_health.amphora_id)
newcreatedtime = datetime.datetime.utcnow()
oldcreatetime = amphora_health.last_update
diff = newcreatedtime - oldcreatetime
self.assertEqual(diff.seconds, 600)
def test_update(self):
d = datetime.datetime.today()
amphora_health = self.create_amphora_health(self.FAKE_UUID_1)
self.amphora_health_repo.update(self.session,
amphora_health.amphora_id,
last_update=d)
new_amphora_health = self.amphora_health_repo.get(
self.session, amphora_id=amphora_health.amphora_id)
self.assertEqual(d, new_amphora_health.last_update)
def test_delete(self):
amphora_health = self.create_amphora_health(self.FAKE_UUID_1)
self.amphora_health_repo.delete(
self.session, amphora_id=amphora_health.amphora_id)
self.assertIsNone(self.amphora_health_repo.get(
self.session, amphora_id=amphora_health.amphora_id))

View File

@ -19,7 +19,9 @@ from octavia.openstack.common import log as logging
from octavia.openstack.common import uuidutils from octavia.openstack.common import uuidutils
from octavia.tests.unit import base as base from octavia.tests.unit import base as base
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
FAKE_UUID_1 = uuidutils.generate_uuid()
class LoggingMixIn(base.TestCase): class LoggingMixIn(base.TestCase):

View File

@ -0,0 +1,103 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from octavia.common import constants
from octavia.controller.healthmanager import update_health_mixin as healthmixin
from octavia.openstack.common import uuidutils
import octavia.tests.unit.base as base
class TestUpdateHealthMixin(base.TestCase):
FAKE_UUID_1 = uuidutils.generate_uuid()
def setUp(self):
super(TestUpdateHealthMixin, self).setUp()
self.hm = healthmixin.UpdateHealthMixin()
self.amphora_health_repo = mock.MagicMock()
self.listener_repo = mock.MagicMock()
self.member_repo = mock.MagicMock()
self.pool_repo = mock.MagicMock()
self.hm.amphora_health_repo = self.amphora_health_repo
self.hm.listener_repo = self.listener_repo
self.hm.member_repo = self.member_repo
self.hm.pool_repo = self.pool_repo
@mock.patch('octavia.db.api.get_session')
@mock.patch('sqlalchemy.sql.func.now')
def test_update_health_Online(self, lastupdate, session):
health = {
"amphora-status": constants.AMPHORA_UP,
"amphora-id": self.FAKE_UUID_1,
"listeners": {
"listener-id-1": {"listener-status": constants.ONLINE,
"members": {"member-id-1": constants.ONLINE}
}
}
}
session.return_value = 'blah'
lastupdate.return_value = '2014-02-12'
self.hm.update_health(health)
self.assertTrue(self.amphora_health_repo.update.called)
# test listener, member
for listener_id, listener in health.get('listeners', {}).iteritems():
self.listener_repo.update.assert_any_call(
'blah', listener_id, operating_status=constants.ONLINE)
for member_id, member in listener.get('members', {}).iteritems():
self.member_repo.update.assert_any_call(
'blah', id=member_id, operating_status=constants.ONLINE)
@mock.patch('octavia.db.api.get_session')
@mock.patch('sqlalchemy.sql.func.now')
def test_update_health_Error(self, lastupdate, session):
health = {
"amphora-status": constants.AMPHORA_DOWN,
"amphora-id": self.FAKE_UUID_1,
"listeners": {
"listener-id-1": {"listener-status": constants.ERROR,
"members": {"member-id-1": constants.ERROR}
},
"listener-id-2": {"listener-status": constants.ERROR,
"members": {"member-id-2": constants.ERROR}
}
}
}
session.return_value = 'blah'
lastupdate.return_value = '2014-02-12'
self.hm.update_health(health)
self.assertFalse(self.amphora_health_repo.update.called)
# test listener, member
for listener_id, listener in health.get('listeners', {}).iteritems():
self.listener_repo.update.assert_any_call(
'blah', listener_id, operating_status=constants.ERROR)
for member_id, member in listener.get('members', {}).iteritems():
self.member_repo.update.assert_any_call(
'blah', id=member_id, operating_status=constants.ERROR)