Add service registry

The proposed service registry provides a mechanism to track all running
kingbird engines by making them periodically update service entries in
the database. If one of the engines fails to update the database it will
be removed from the registry and corresponding lock entries will also be
purged.

Change-Id: Ifd785073343c5873f69dfb10146f1957d7b92264
Closes-Bug: #1610609
Implements: blueprint engine-reporting
This commit is contained in:
Dimitri Mazmanov
2016-08-10 13:07:32 +02:00
parent 5ff62dd2bd
commit 62fea4fa06
12 changed files with 474 additions and 9 deletions

View File

@@ -21,6 +21,9 @@ global_opts = [
default=True,
help='Enables or disables use of default quota class '
'with default quota.'),
cfg.IntOpt('report_interval',
default=60,
help='Seconds between running periodic reporting tasks.'),
]
# Global nova quotas for all projects
nova_quotas = [
@@ -137,7 +140,6 @@ scheduler_opts = [
help='periodic time interval for automatic quota sync job')
]
common_opts = [
cfg.IntOpt('workers', default=1,
help='number of workers'),
@@ -146,7 +148,6 @@ common_opts = [
help='hostname of the machine')
]
scheduler_opt_group = cfg.OptGroup('scheduler',
title='Scheduler options for periodic job')
# The group stores Kingbird global limit for all the projects

View File

@@ -23,7 +23,6 @@ from oslo_db import api
CONF = cfg.CONF
_BACKEND_MAPPING = {'sqlalchemy': 'kingbird.db.sqlalchemy.api'}
IMPL = api.DBAPI.from_config(CONF, backend_mapping=_BACKEND_MAPPING)
@@ -36,6 +35,7 @@ def get_engine():
def get_session():
return IMPL.get_session()
# quota usage db methods
###################
@@ -126,3 +126,25 @@ def sync_lock_release(context, task_type):
def sync_lock_steal(context, engine_id, task_type):
return IMPL.sync_lock_steal(context, engine_id, task_type)
def service_create(context, service_id, host=None, binary=None,
topic=None):
return IMPL.service_create(context, service_id=service_id, host=host,
binary=binary, topic=topic)
def service_update(context, service_id, values=None):
return IMPL.service_update(context, service_id, values=values)
def service_delete(context, service_id):
return IMPL.service_delete(context, service_id)
def service_get(context, service_id):
return IMPL.service_get(context, service_id)
def service_get_all(context):
return IMPL.service_get_all(context)

View File

@@ -23,6 +23,7 @@ from oslo_config import cfg
from oslo_db import api as oslo_db_api
from oslo_db.sqlalchemy import session as db_session
from oslo_log import log as logging
from oslo_utils import timeutils
from kingbird.common import exceptions as exception
from kingbird.common.i18n import _
@@ -316,3 +317,56 @@ def sync_lock_release(context, task_type):
def sync_lock_steal(context, engine_id, task_type):
sync_lock_release(context, task_type)
return sync_lock_acquire(context, engine_id, task_type)
def service_create(context, service_id, host=None, binary=None,
topic=None):
session = _session(context)
with session.begin():
time_now = timeutils.utcnow()
svc = models.Service(id=service_id,
host=host,
binary=binary,
topic=topic,
created_at=time_now,
updated_at=time_now)
session.add(svc)
return svc
def service_update(context, service_id, values=None):
session = _session(context)
with session.begin():
service = session.query(models.Service).get(service_id)
if not service:
return
if values is None:
values = {}
values.update({'updated_at': timeutils.utcnow()})
service.update(values)
service.save(session)
return service
def service_delete(context, service_id):
session = _session(context)
with session.begin():
session.query(models.Service).filter_by(
id=service_id).delete(synchronize_session='fetch')
# Remove all engine locks
locks = model_query(context, models.SyncLock). \
filter_by(engine_id=service_id).all()
for lock in locks:
lock.delete(session=session)
def service_get(context, service_id):
return model_query(context, models.Service).get(service_id)
def service_get_all(context):
return model_query(context, models.Service).all()

View File

@@ -0,0 +1,37 @@
# Copyright (c) 2015 Ericsson AB.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy
def upgrade(migrate_engine):
meta = sqlalchemy.MetaData()
meta.bind = migrate_engine
service = sqlalchemy.Table(
'service', meta,
sqlalchemy.Column('id', sqlalchemy.String(36),
primary_key=True, nullable=False),
sqlalchemy.Column('host', sqlalchemy.String(length=255)),
sqlalchemy.Column('binary', sqlalchemy.String(length=255)),
sqlalchemy.Column('topic', sqlalchemy.String(length=255)),
sqlalchemy.Column('disabled', sqlalchemy.Boolean, default=False),
sqlalchemy.Column('disabled_reason', sqlalchemy.String(length=255)),
sqlalchemy.Column('created_at', sqlalchemy.DateTime),
sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
sqlalchemy.Column('deleted', sqlalchemy.Integer),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
service.create()

View File

@@ -20,7 +20,7 @@ from oslo_config import cfg
from oslo_db.sqlalchemy import models
from sqlalchemy.orm import session as orm_session
from sqlalchemy import (Column, Integer, String, schema)
from sqlalchemy import (Column, Integer, String, Boolean, schema)
from sqlalchemy.ext.declarative import declarative_base
CONF = cfg.CONF
@@ -126,3 +126,21 @@ class SyncLock(BASE, KingbirdBase):
timer_lock = Column(String(255), nullable=False)
task_type = Column(String(36), nullable=False)
class Service(BASE, KingbirdBase):
""""Kingbird service engine registry"""
__tablename__ = 'service'
id = Column('id', String(36), primary_key=True, nullable=False)
host = Column(String(255))
binary = Column(String(255))
topic = Column(String(255))
disabled = Column(Boolean, default=False)
disabled_reason = Column(String(255))

View File

@@ -12,14 +12,12 @@
import six
import time
import uuid
import functools
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from kingbird.common import consts
from kingbird.common import context
from kingbird.common import exceptions
@@ -28,10 +26,11 @@ from kingbird.common.i18n import _LE
from kingbird.common.i18n import _LI
from kingbird.common import messaging as rpc_messaging
from kingbird.engine.quota_manager import QuotaManager
from kingbird.engine import scheduler
from kingbird.objects import service as service_obj
from oslo_service import service
from oslo_utils import timeutils
from oslo_utils import uuidutils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@@ -83,7 +82,7 @@ class EngineService(service.Service):
self.qm = QuotaManager()
def start(self):
self.engine_id = str(uuid.uuid4())
self.engine_id = uuidutils.generate_uuid()
self.init_tgm()
self.init_qm()
target = oslo_messaging.Target(version=self.rpc_api_version,
@@ -93,12 +92,42 @@ class EngineService(service.Service):
self._rpc_server = rpc_messaging.get_rpc_server(self.target, self)
self._rpc_server.start()
self.service_registry_cleanup()
self.TG.add_timer(cfg.CONF.report_interval,
self.service_registry_report)
super(EngineService, self).start()
if self.periodic_enable:
LOG.info("Adding periodic tasks for the engine to perform")
self.TG.add_timer(self.periodic_interval,
self.periodic_balance_all, None, self.engine_id)
def service_registry_report(self):
ctx = context.get_admin_context()
try:
svc = service_obj.Service.update(ctx, self.engine_id)
# if svc is None, means it's not created.
if svc is None:
service_obj.Service.create(ctx, self.engine_id, self.host,
'kingbird-engine', self.topic)
except Exception as ex:
LOG.error(_LE('Service %(service_id)s update failed: %(error)s'),
{'service_id': self.engine_id, 'error': ex})
def service_registry_cleanup(self):
ctx = context.get_admin_context()
time_window = (2 * cfg.CONF.report_interval)
services = service_obj.Service.get_all(ctx)
for svc in services:
if svc['id'] == self.engine_id:
continue
if timeutils.is_older_than(svc['updated_at'], time_window):
# < time_line:
# hasn't been updated, assuming it's died.
LOG.info(_LI('Service %s was aborted'), svc['id'])
service_obj.Service.delete(ctx, svc['id'])
def periodic_balance_all(self, engine_id):
# Automated Quota Sync for all the keystone projects
LOG.info(_LI("Periodic quota sync job started at: %s"),

View File

69
kingbird/objects/base.py Normal file
View File

@@ -0,0 +1,69 @@
# Copyright (c) 2015 Ericsson AB.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Kingbird common internal object model"""
from oslo_utils import versionutils
from oslo_versionedobjects import base
from kingbird import objects
VersionedObjectDictCompat = base.VersionedObjectDictCompat
class KingbirdObject(base.VersionedObject):
"""Base class for kingbird objects.
This is the base class for all objects that can be remoted or instantiated
via RPC. Simply defining a sub-class of this class would make it remotely
instantiatable. Objects should implement the "get" class method and the
"save" object method.
"""
OBJ_PROJECT_NAMESPACE = 'kingbird'
VERSION = '1.0'
@staticmethod
def _from_db_object(context, obj, db_obj):
if db_obj is None:
return None
for field in obj.fields:
if field == 'metadata':
obj['metadata'] = db_obj['meta_data']
else:
obj[field] = db_obj[field]
obj._context = context
obj.obj_reset_changes()
return obj
class KingbirdObjectRegistry(base.VersionedObjectRegistry):
def registration_hook(self, cls, index):
"""Callback for object registration.
When an object is registered, this function will be called for
maintaining kingbird.objects.$OBJECT as the highest-versioned
implementation of a given object.
"""
version = versionutils.convert_version_to_tuple(cls.VERSION)
if not hasattr(objects, cls.obj_name()):
setattr(objects, cls.obj_name(), cls)
else:
curr_version = versionutils.convert_version_to_tuple(
getattr(objects, cls.obj_name()).VERSION)
if version >= curr_version:
setattr(objects, cls.obj_name(), cls)

View File

@@ -0,0 +1,63 @@
# Copyright (c) 2015 Ericsson AB.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Service object."""
from kingbird.db import api as db_api
from kingbird.objects import base
from oslo_versionedobjects import fields
@base.KingbirdObjectRegistry.register
class Service(base.KingbirdObject, base.VersionedObjectDictCompat):
"""Kingbird service object."""
fields = {
'id': fields.UUIDField(),
'host': fields.StringField(),
'binary': fields.StringField(),
'topic': fields.StringField(),
'disabled': fields.BooleanField(),
'disabled_reason': fields.StringField(nullable=True),
'created_at': fields.DateTimeField(),
'updated_at': fields.DateTimeField(),
'deleted_at': fields.DateTimeField(),
'deleted': fields.IntegerField(),
}
@classmethod
def create(cls, context, service_id, host=None, binary=None, topic=None):
obj = db_api.service_create(context, service_id=service_id, host=host,
binary=binary, topic=topic)
return cls._from_db_object(context, cls(context), obj)
@classmethod
def get(cls, context, service_id):
obj = db_api.service_get(context, service_id)
return cls._from_db_object(context, cls(), obj)
@classmethod
def get_all(cls, context):
objs = db_api.service_get_all(context)
return [cls._from_db_object(context, cls(), obj) for obj in objs]
@classmethod
def update(cls, context, obj_id, values=None):
obj = db_api.service_update(context, obj_id, values=values)
return cls._from_db_object(context, cls(), obj)
@classmethod
def delete(cls, context, obj_id):
db_api.service_delete(context, obj_id)

View File

@@ -0,0 +1,112 @@
# Copyright (c) 2015 Ericsson AB
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy
from oslo_config import cfg
from oslo_db import options
from oslo_utils import uuidutils
from kingbird.common import config
from kingbird.db import api as api
from kingbird.db.sqlalchemy import api as db_api
from kingbird.tests import base
from kingbird.tests import utils
config.register_options()
get_engine = api.get_engine
class ServiceRegistryTest(base.KingbirdTestCase):
def setup_dummy_db(self):
options.cfg.set_defaults(options.database_opts,
sqlite_synchronous=False)
options.set_defaults(cfg.CONF, connection="sqlite://",
sqlite_db='kingbird.db')
engine = get_engine()
db_api.db_sync(engine)
engine.connect()
def reset_dummy_db(self):
engine = get_engine()
meta = sqlalchemy.MetaData()
meta.reflect(bind=engine)
for table in reversed(meta.sorted_tables):
if table.name == 'migrate_version':
continue
engine.execute(table.delete())
def setUp(self):
super(ServiceRegistryTest, self).setUp()
self.setup_dummy_db()
self.addCleanup(self.reset_dummy_db)
self.ctxt = utils.dummy_context()
def _create_service(self, **kwargs):
values = {
'service_id': 'f9aff81e-bc1f-4119-941d-ad1ea7f31d19',
'host': 'localhost',
'binary': 'kingbird-engine',
'topic': 'engine',
}
values.update(kwargs)
return db_api.service_create(self.ctxt, **values)
def test_service_create_get(self):
service = self._create_service()
ret_service = db_api.service_get(self.ctxt, service.id)
self.assertIsNotNone(ret_service)
self.assertEqual(service.id, ret_service.id)
self.assertEqual(service.binary, ret_service.binary)
self.assertEqual(service.host, ret_service.host)
self.assertEqual(service.topic, ret_service.topic)
self.assertEqual(service.disabled, ret_service.disabled)
self.assertEqual(service.disabled_reason, ret_service.disabled_reason)
self.assertIsNotNone(service.created_at)
self.assertIsNotNone(service.updated_at)
def test_service_get_all(self):
values = []
for i in range(4):
values.append({'service_id': uuidutils.generate_uuid(),
'host': 'host-%s' % i})
[self._create_service(**val) for val in values]
services = db_api.service_get_all(self.ctxt)
self.assertEqual(4, len(services))
def test_service_update(self):
old_service = self._create_service()
old_updated_time = old_service.updated_at
values = {'host': 'host-updated'}
new_service = db_api.service_update(self.ctxt, old_service.id, values)
self.assertEqual('host-updated', new_service.host)
self.assertGreater(new_service.updated_at, old_updated_time)
def test_service_update_values_none(self):
old_service = self._create_service()
old_updated_time = old_service.updated_at
new_service = db_api.service_update(self.ctxt, old_service.id)
self.assertGreater(new_service.updated_at, old_updated_time)
def test_service_delete(self):
service = self._create_service()
db_api.service_delete(self.ctxt, service.id)
res = db_api.service_get(self.ctxt, service.id)
self.assertIsNone(res)

View File

View File

@@ -0,0 +1,60 @@
# Copyright (c) 2015 Ericsson AB.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from kingbird.objects import base as obj_base
from kingbird.tests import base
from oslo_versionedobjects import fields as obj_fields
class TestBaseObject(base.KingbirdTestCase):
def test_base_class(self):
obj = obj_base.KingbirdObject()
self.assertEqual(obj_base.KingbirdObject.OBJ_PROJECT_NAMESPACE,
obj.OBJ_PROJECT_NAMESPACE)
self.assertEqual(obj_base.KingbirdObject.VERSION,
obj.VERSION)
@mock.patch.object(obj_base.KingbirdObject, "obj_reset_changes")
def test_from_db_object(self, mock_obj_reset_ch):
class TestKingbirdObject(obj_base.KingbirdObject,
obj_base.VersionedObjectDictCompat):
fields = {
"key1": obj_fields.StringField(),
"key2": obj_fields.StringField(),
}
obj = TestKingbirdObject()
context = mock.Mock()
db_obj = {
"key1": "value1",
"key2": "value2",
}
res = obj_base.KingbirdObject._from_db_object(context, obj, db_obj)
self.assertIsNotNone(res)
self.assertEqual("value1", obj["key1"])
self.assertEqual("value2", obj["key2"])
self.assertEqual(obj._context, context)
mock_obj_reset_ch.assert_called_once_with()
def test_from_db_object_none(self):
obj = obj_base.KingbirdObject()
db_obj = None
context = mock.Mock()
res = obj_base.KingbirdObject._from_db_object(context, obj, db_obj)
self.assertIsNone(res)