basic schedule service: Service class for binaries

running on hosts

Create the basic Service class for binaries running
on hosts.
A service takes a manager and enables rpc by listening
to queues based on topic. It also periodically runs
tasks on the manager(optional) and reports it state
to the database services table.
Also add some unit tests for Service class.

Change-Id: I241c4757d0b2d1880d1a1a59cce007ca9d1037c7
Partial-Bug: #1527097
This commit is contained in:
chenying 2015-12-18 14:44:00 +08:00
parent bda35bb321
commit e2cea1ed3e
29 changed files with 1758 additions and 10 deletions

View File

@ -1,5 +1,6 @@
include AUTHORS
include ChangeLog
recursive-include smaug *.cfg
exclude .gitignore
exclude .gitreview

View File

@ -77,6 +77,14 @@ if [[ "$Q_ENABLE_SMAUG" == "True" ]]; then
elif [[ "$1" == "stack" && "$2" == "extra" ]]; then
echo_summary "Initializing Smaug Service"
SMAUG_BIN_DIR=$(get_python_exec_prefix)
if is_service_enabled $DATABASE_BACKENDS; then
# (re)create smaug database
recreate_database smaug utf8
# Migrate smaug database
$SMAUG_BIN_DIR/smaug-manage db sync
fi
if is_service_enabled smaug-api; then
run_process smaug-api "$SMAUG_BIN_DIR/smaug-api --config-file $SMAUG_API_CONF"
fi

View File

@ -26,5 +26,6 @@ Routes!=2.0,>=1.12.3;python_version!='2.7'
six>=1.9.0
SQLAlchemy<1.1.0,>=0.9.9
sqlalchemy-migrate>=0.9.6
stevedore>=1.5.0 # Apache-2.0
WebOb>=1.2.3
oslo.i18n>=1.5.0 # Apache-2.0

View File

@ -32,6 +32,10 @@ data_files =
[entry_points]
console_scripts =
smaug-api = smaug.cmd.api:main
smaug-manage = smaug.cmd.manage:main
smaug.database.migration_backend =
sqlalchemy = oslo_db.sqlalchemy.migration
[build_sphinx]
source-dir = doc/source

View File

@ -620,7 +620,7 @@ class Resource(wsgi.Application):
action_args.update(contents)
project_id = action_args.pop("project_id", None)
context = request.environ.get('smuag.context')
context = request.environ.get('smaug.context')
if (context and project_id and (project_id != context.project_id)):
msg = _("Malformed request url")
return Fault(webob.exc.HTTPBadRequest(explanation=msg))

242
smaug/cmd/manage.py Normal file
View File

@ -0,0 +1,242 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
CLI interface for smaug management.
"""
from __future__ import print_function
import os
import sys
from oslo_config import cfg
from oslo_db.sqlalchemy import migration
from oslo_log import log as logging
from smaug import i18n
i18n.enable_lazy()
# Need to register global_opts
from smaug.common import config # noqa
from smaug import context
from smaug import db
from smaug.db import migration as db_migration
from smaug.db.sqlalchemy import api as db_api
from smaug.i18n import _
from smaug import utils
from smaug import version
CONF = cfg.CONF
# Decorators for actions
def args(*args, **kwargs):
def _decorator(func):
func.__dict__.setdefault('args', []).insert(0, (args, kwargs))
return func
return _decorator
class DbCommands(object):
"""Class for managing the database."""
def __init__(self):
pass
@args('version', nargs='?', default=None,
help='Database version')
def sync(self, version=None):
"""Sync the database up to the most recent version."""
return db_migration.db_sync(version)
def version(self):
"""Print the current database version."""
print(db_migration.MIGRATE_REPO_PATH)
print(migration.db_version(db_api.get_engine(),
db_migration.MIGRATE_REPO_PATH,
db_migration.INIT_VERSION))
class VersionCommands(object):
"""Class for exposing the codebase version."""
def __init__(self):
pass
def list(self):
print(version.version_string())
def __call__(self):
self.list()
class ConfigCommands(object):
"""Class for exposing the flags defined by flag_file(s)."""
def __init__(self):
pass
@args('param', nargs='?', default=None,
help='Configuration parameter to display (default: %(default)s)')
def list(self, param=None):
"""List parameters configured for smaug.
Lists all parameters configured for smaug unless an optional argument
is specified. If the parameter is specified we only print the
requested parameter. If the parameter is not found an appropriate
error is produced by .get*().
"""
param = param and param.strip()
if param:
print('%s = %s' % (param, CONF.get(param)))
else:
for key, value in CONF.items():
print('%s = %s' % (key, value))
class ServiceCommands(object):
"""Methods for managing services."""
def list(self):
"""Show a list of all smaug services."""
ctxt = context.get_admin_context()
services = db.service_get_all(ctxt)
print_format = "%-16s %-36s %-16s %-10s %-5s %-10s"
print(print_format % (_('Binary'),
_('Host'),
_('Zone'),
_('Status'),
_('State'),
_('Updated At')))
for svc in services:
alive = utils.service_is_up(svc)
art = ":-)" if alive else "XXX"
status = 'enabled'
if svc['disabled']:
status = 'disabled'
print(print_format % (svc['binary'], svc['host'].partition('.')[0],
svc['availability_zone'], status, art,
svc['updated_at']))
CATEGORIES = {
'config': ConfigCommands,
'db': DbCommands,
'service': ServiceCommands,
'version': VersionCommands,
}
def methods_of(obj):
"""Return non-private methods from an object.
Get all callable methods of an object that don't start with underscore
:return: a list of tuples of the form (method_name, method)
"""
result = []
for i in dir(obj):
if callable(getattr(obj, i)) and not i.startswith('_'):
result.append((i, getattr(obj, i)))
return result
def add_command_parsers(subparsers):
for category in CATEGORIES:
command_object = CATEGORIES[category]()
parser = subparsers.add_parser(category)
parser.set_defaults(command_object=command_object)
category_subparsers = parser.add_subparsers(dest='action')
for (action, action_fn) in methods_of(command_object):
parser = category_subparsers.add_parser(action)
action_kwargs = []
for args, kwargs in getattr(action_fn, 'args', []):
parser.add_argument(*args, **kwargs)
parser.set_defaults(action_fn=action_fn)
parser.set_defaults(action_kwargs=action_kwargs)
category_opt = cfg.SubCommandOpt('category',
title='Command categories',
handler=add_command_parsers)
def get_arg_string(args):
arg = None
if args[0] == '-':
# (Note)zhiteng: args starts with FLAGS.oparser.prefix_chars
# is optional args. Notice that cfg module takes care of
# actual ArgParser so prefix_chars is always '-'.
if args[1] == '-':
# This is long optional arg
arg = args[2:]
else:
arg = args[1:]
else:
arg = args
return arg
def fetch_func_args(func):
fn_args = []
for args, kwargs in getattr(func, 'args', []):
arg = get_arg_string(args[0])
fn_args.append(getattr(CONF.category, arg))
return fn_args
def main():
"""Parse options and call the appropriate class/method."""
CONF.register_cli_opt(category_opt)
script_name = sys.argv[0]
if len(sys.argv) < 2:
print(_("\nOpenStack Smaug version: %(version)s\n") %
{'version': version.version_string()})
print(script_name + " category action [<args>]")
print(_("Available categories:"))
for category in CATEGORIES:
print(_("\t%s") % category)
sys.exit(2)
try:
CONF(sys.argv[1:], project='smaug',
version=version.version_string())
logging.setup(CONF, "smaug")
except cfg.ConfigDirNotFoundError as details:
print(_("Invalid directory: %s") % details)
sys.exit(2)
except cfg.ConfigFilesNotFoundError:
cfgfile = CONF.config_file[-1] if CONF.config_file else None
if cfgfile and not os.access(cfgfile, os.R_OK):
st = os.stat(cfgfile)
print(_("Could not read %s. Re-running with sudo") % cfgfile)
try:
os.execvp('sudo', ['sudo', '-u', '#%s' % st.st_uid] + sys.argv)
except Exception:
print(_('sudo failed, continuing as if nothing happened'))
print(_('Please re-run smaug-manage as root.'))
sys.exit(2)
fn = CONF.category.action_fn
fn_args = fetch_func_args(fn)
fn(*fn_args)

View File

@ -31,7 +31,11 @@ logging.register_options(CONF)
core_opts = [
cfg.StrOpt('api_paste_config',
default="api-paste.ini",
help='File name for the paste.deploy config for smaug-api')
help='File name for the paste.deploy config for smaug-api'),
cfg.StrOpt('state_path',
default='/var/lib/smaug',
deprecated_name='pybasedir',
help="Top-level directory for maintaining smaug's state"),
]
debug_opts = [
@ -41,6 +45,10 @@ CONF.register_cli_opts(core_opts)
CONF.register_cli_opts(debug_opts)
global_opts = [
cfg.IntOpt('service_down_time',
default=60,
help='Maximum time since last check-in for a service to be '
'considered up'),
cfg.StrOpt('scheduler_topic',
default='Smaug-scheduler',
help='The topic that scheduler nodes listen on'),
@ -51,6 +59,9 @@ global_opts = [
default=socket.gethostname(),
help='Name of this node. This can be an opaque identifier. '
'It is not necessarily a host name, FQDN, or IP address.'),
cfg.StrOpt('storage_availability_zone',
default='nova',
help='Availability zone of this node'),
cfg.StrOpt('auth_strategy',
default='keystone',
choices=['noauth', 'keystone'],

16
smaug/db/__init__.py Normal file
View File

@ -0,0 +1,16 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
DB abstraction for smaug
"""
from smaug.db.api import * # noqa

120
smaug/db/api.py Normal file
View File

@ -0,0 +1,120 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Defines interface for DB access.
Functions in this module are imported into the smaug.db namespace. Call these
functions from smaug.db namespace, not the smaug.db.api namespace.
All functions in this module return objects that implement a dictionary-like
interface. Currently, many of these objects are sqlalchemy objects that
implement a dictionary interface. However, a future goal is to have all of
these objects be simple dictionaries.
**Related Flags**
:connection: string specifying the sqlalchemy connection to use, like:
`sqlite:///var/lib/smaug/smaug.sqlite`.
:enable_new_services: when adding a new service to the database, is it in the
pool of available hardware (Default: True)
"""
from oslo_config import cfg
from oslo_db import concurrency as db_concurrency
from oslo_db import options as db_options
db_opts = [
cfg.BoolOpt('enable_new_services',
default=True,
help='Services to be added to the available pool on create'),
]
CONF = cfg.CONF
CONF.register_opts(db_opts)
db_options.set_defaults(CONF)
CONF.set_default('sqlite_db', 'smaug.sqlite', group='database')
_BACKEND_MAPPING = {'sqlalchemy': 'smaug.db.sqlalchemy.api'}
IMPL = db_concurrency.TpoolDbapiWrapper(CONF, _BACKEND_MAPPING)
# The maximum value a signed INT type may have
MAX_INT = 0x7FFFFFFF
###################
def dispose_engine():
"""Force the engine to establish new connections."""
# FIXME(jdg): When using sqlite if we do the dispose
# we seem to lose our DB here. Adding this check
# means we don't do the dispose, but we keep our sqlite DB
# This likely isn't the best way to handle this
if 'sqlite' not in IMPL.get_engine().name:
return IMPL.dispose_engine()
else:
return
###################
def service_destroy(context, service_id):
"""Destroy the service or raise if it does not exist."""
return IMPL.service_destroy(context, service_id)
def service_get(context, service_id):
"""Get a service or raise if it does not exist."""
return IMPL.service_get(context, service_id)
def service_get_by_host_and_topic(context, host, topic):
"""Get a service by host it's on and topic it listens to."""
return IMPL.service_get_by_host_and_topic(context, host, topic)
def service_get_all(context, disabled=None):
"""Get all services."""
return IMPL.service_get_all(context, disabled)
def service_get_all_by_topic(context, topic, disabled=None):
"""Get all services for a given topic."""
return IMPL.service_get_all_by_topic(context, topic, disabled=disabled)
def service_get_by_args(context, host, binary):
"""Get the state of an service by node name and binary."""
return IMPL.service_get_by_args(context, host, binary)
def service_create(context, values):
"""Create a service from the values dictionary."""
return IMPL.service_create(context, values)
def service_update(context, service_id, values):
"""Set the given properties on an service and update it.
Raises NotFound if service does not exist.
"""
return IMPL.service_update(context, service_id, values)

38
smaug/db/base.py Normal file
View File

@ -0,0 +1,38 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Base class for classes that need modular database access."""
from oslo_config import cfg
from oslo_utils import importutils
db_driver_opt = cfg.StrOpt('db_driver',
default='smaug.db',
help='Driver to use for database access')
CONF = cfg.CONF
CONF.register_opt(db_driver_opt)
class Base(object):
"""DB driver is injected in the init method."""
def __init__(self, db_driver=None):
# NOTE(mriedem): Without this call, multiple inheritance involving
# the db Base class does not work correctly.
super(Base, self).__init__()
if not db_driver:
db_driver = CONF.db_driver
self.db = importutils.import_module(db_driver) # pylint: disable=C0103
self.db.dispose_engine()

57
smaug/db/migration.py Normal file
View File

@ -0,0 +1,57 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Database setup and migration commands."""
import os
import threading
from oslo_config import cfg
from oslo_db import options
from stevedore import driver
from smaug.db.sqlalchemy import api as db_api
INIT_VERSION = 000
_IMPL = None
_LOCK = threading.Lock()
options.set_defaults(cfg.CONF)
MIGRATE_REPO_PATH = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
'sqlalchemy',
'migrate_repo',
)
def get_backend():
global _IMPL
if _IMPL is None:
with _LOCK:
if _IMPL is None:
_IMPL = driver.DriverManager(
"smaug.database.migration_backend",
cfg.CONF.database.backend).driver
return _IMPL
def db_sync(version=None, init_version=INIT_VERSION, engine=None):
"""Migrate the database to `version` or the most recent version."""
if engine is None:
engine = db_api.get_engine()
return get_backend().db_sync(engine=engine,
abs_path=MIGRATE_REPO_PATH,
version=version,
init_version=init_version)

View File

308
smaug/db/sqlalchemy/api.py Normal file
View File

@ -0,0 +1,308 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of SQLAlchemy backend."""
import functools
import sys
import threading
import time
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_db import options
from oslo_db.sqlalchemy import session as db_session
from oslo_log import log as logging
from oslo_utils import timeutils
from sqlalchemy.sql.expression import literal_column
from sqlalchemy.sql import func
from smaug.db.sqlalchemy import models
from smaug import exception
from smaug.i18n import _, _LW
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
options.set_defaults(CONF, connection='sqlite:///$state_path/smaug.sqlite')
_LOCK = threading.Lock()
_FACADE = None
def _create_facade_lazily():
global _LOCK
with _LOCK:
global _FACADE
if _FACADE is None:
_FACADE = db_session.EngineFacade(
CONF.database.connection,
**dict(CONF.database)
)
return _FACADE
def get_engine():
facade = _create_facade_lazily()
return facade.get_engine()
def get_session(**kwargs):
facade = _create_facade_lazily()
return facade.get_session(**kwargs)
def dispose_engine():
get_engine().dispose()
_DEFAULT_QUOTA_NAME = 'default'
def get_backend():
"""The backend is this module itself."""
return sys.modules[__name__]
def is_admin_context(context):
"""Indicates if the request context is an administrator."""
if not context:
LOG.warning(_LW('Use of empty request context is deprecated'),
DeprecationWarning)
raise Exception('die')
return context.is_admin
def is_user_context(context):
"""Indicates if the request context is a normal user."""
if not context:
return False
if context.is_admin:
return False
if not context.user_id or not context.project_id:
return False
return True
def authorize_project_context(context, project_id):
"""Ensures a request has permission to access the given project."""
if is_user_context(context):
if not context.project_id:
raise exception.NotAuthorized()
elif context.project_id != project_id:
raise exception.NotAuthorized()
def authorize_user_context(context, user_id):
"""Ensures a request has permission to access the given user."""
if is_user_context(context):
if not context.user_id:
raise exception.NotAuthorized()
elif context.user_id != user_id:
raise exception.NotAuthorized()
def require_admin_context(f):
"""Decorator to require admin request context.
The first argument to the wrapped function must be the context.
"""
def wrapper(*args, **kwargs):
if not is_admin_context(args[0]):
raise exception.AdminRequired()
return f(*args, **kwargs)
return wrapper
def require_context(f):
"""Decorator to require *any* user or admin context.
This does no authorization for user or project access matching, see
:py:func:`authorize_project_context` and
:py:func:`authorize_user_context`.
The first argument to the wrapped function must be the context.
"""
def wrapper(*args, **kwargs):
if not is_admin_context(args[0]) and not is_user_context(args[0]):
raise exception.NotAuthorized()
return f(*args, **kwargs)
return wrapper
def _retry_on_deadlock(f):
"""Decorator to retry a DB API call if Deadlock was received."""
@functools.wraps(f)
def wrapped(*args, **kwargs):
while True:
try:
return f(*args, **kwargs)
except db_exc.DBDeadlock:
LOG.warning(_LW("Deadlock detected when running "
"'%(func_name)s': Retrying..."),
dict(func_name=f.__name__))
# Retry!
time.sleep(0.5)
continue
functools.update_wrapper(wrapped, f)
return wrapped
def model_query(context, *args, **kwargs):
"""Query helper that accounts for context's `read_deleted` field.
:param context: context to query under
:param session: if present, the session to use
:param read_deleted: if present, overrides context's read_deleted field.
:param project_only: if present and context is user-type, then restrict
query to match the context's project_id.
"""
session = kwargs.get('session') or get_session()
read_deleted = kwargs.get('read_deleted') or context.read_deleted
project_only = kwargs.get('project_only')
query = session.query(*args)
if read_deleted == 'no':
query = query.filter_by(deleted=False)
elif read_deleted == 'yes':
pass # omit the filter to include deleted and active
elif read_deleted == 'only':
query = query.filter_by(deleted=True)
else:
raise Exception(
_("Unrecognized read_deleted value '%s'") % read_deleted)
if project_only and is_user_context(context):
query = query.filter_by(project_id=context.project_id)
return query
@require_admin_context
def service_destroy(context, service_id):
session = get_session()
with session.begin():
service_ref = _service_get(context, service_id, session=session)
service_ref.delete(session=session)
@require_admin_context
def _service_get(context, service_id, session=None):
result = model_query(
context,
models.Service,
session=session).\
filter_by(id=service_id).\
first()
if not result:
raise exception.ServiceNotFound(service_id=service_id)
return result
@require_admin_context
def service_get(context, service_id):
return _service_get(context, service_id)
@require_admin_context
def service_get_all(context, disabled=None):
query = model_query(context, models.Service)
if disabled is not None:
query = query.filter_by(disabled=disabled)
return query.all()
@require_admin_context
def service_get_all_by_topic(context, topic, disabled=None):
query = model_query(
context, models.Service, read_deleted="no").\
filter_by(topic=topic)
if disabled is not None:
query = query.filter_by(disabled=disabled)
return query.all()
@require_admin_context
def service_get_by_host_and_topic(context, host, topic):
result = model_query(
context, models.Service, read_deleted="no").\
filter_by(disabled=False).\
filter_by(host=host).\
filter_by(topic=topic).\
first()
if not result:
raise exception.ServiceNotFound(service_id=None)
return result
@require_admin_context
def _service_get_all_topic_subquery(context, session, topic, subq, label):
sort_value = getattr(subq.c, label)
return model_query(context, models.Service,
func.coalesce(sort_value, 0),
session=session, read_deleted="no").\
filter_by(topic=topic).\
filter_by(disabled=False).\
outerjoin((subq, models.Service.host == subq.c.host)).\
order_by(sort_value).\
all()
@require_admin_context
def service_get_by_args(context, host, binary):
results = model_query(context, models.Service).\
filter_by(host=host).\
filter_by(binary=binary).\
all()
for result in results:
if host == result['host']:
return result
raise exception.HostBinaryNotFound(host=host, binary=binary)
@require_admin_context
def service_create(context, values):
service_ref = models.Service()
service_ref.update(values)
if not CONF.enable_new_services:
service_ref.disabled = True
session = get_session()
with session.begin():
service_ref.save(session)
return service_ref
@require_admin_context
def service_update(context, service_id, values):
session = get_session()
with session.begin():
service_ref = _service_get(context, service_id, session=session)
if ('disabled' in values):
service_ref['modified_at'] = timeutils.utcnow()
service_ref['updated_at'] = literal_column('updated_at')
service_ref.update(values)
return service_ref

View File

@ -0,0 +1,4 @@
This is a database migration repository.
More information at
http://code.google.com/p/sqlalchemy-migrate/

View File

@ -0,0 +1,23 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from smaug.db.sqlalchemy import migrate_repo
from migrate.versioning.shell import main
if __name__ == '__main__':
main(debug='False',
repository=os.path.abspath(os.path.dirname(migrate_repo.__file__)))

View File

@ -0,0 +1,20 @@
[db_settings]
# Used to identify which repository this database is versioned under.
# You can use the name of your project.
repository_id=smaug
# The name of the database table used to track the schema version.
# This name shouldn't already be used by your project.
# If this is changed once a database is under version control, you'll need to
# change the table name in each database too.
version_table=migrate_version
# When committing a change script, Migrate will attempt to generate the
# sql for all supported databases; normally, if one of them fails - probably
# because you don't have that database installed - it is ignored and the
# commit continues, perhaps ending successfully.
# Databases in this list MUST compile successfully during a commit, or the
# entire commit will fail. List the databases your application will actually
# be using to ensure your updates to that database work properly.
# This must be a list; example: ['postgres','sqlite']
required_dbs=[]

View File

@ -0,0 +1,72 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Boolean, Column, DateTime
from sqlalchemy import Integer, MetaData, String, Table
def define_tables(meta):
services = Table(
'services', meta,
Column('created_at', DateTime),
Column('updated_at', DateTime),
Column('deleted_at', DateTime),
Column('deleted', Boolean),
Column('id', Integer, primary_key=True, nullable=False),
Column('host', String(length=255)),
Column('binary', String(length=255)),
Column('topic', String(length=255)),
Column('report_count', Integer, nullable=False),
Column('disabled', Boolean),
Column('availability_zone', String(length=255)),
Column('disabled_reason', String(length=255)),
Column('modified_at', DateTime),
mysql_engine='InnoDB'
)
return [services]
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
# create all tables
# Take care on create order for those with FK dependencies
tables = define_tables(meta)
for table in tables:
table.create()
if migrate_engine.name == "mysql":
tables = ["migrate_version",
"services"]
migrate_engine.execute("SET foreign_key_checks = 0")
for table in tables:
migrate_engine.execute(
"ALTER TABLE %s CONVERT TO CHARACTER SET utf8" % table)
migrate_engine.execute("SET foreign_key_checks = 1")
migrate_engine.execute(
"ALTER DATABASE %s DEFAULT CHARACTER SET utf8" %
migrate_engine.url.database)
migrate_engine.execute("ALTER TABLE %s Engine=InnoDB" % table)
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
tables = define_tables(meta)
tables.reverse()
for table in tables:
table.drop()

View File

@ -0,0 +1,75 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
SQLAlchemy models for smaug data.
"""
from oslo_config import cfg
from oslo_db.sqlalchemy import models
from oslo_utils import timeutils
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import DateTime, Boolean
CONF = cfg.CONF
BASE = declarative_base()
class SmaugBase(models.TimestampMixin,
models.ModelBase):
"""Base class for Smaug Models."""
__table_args__ = {'mysql_engine': 'InnoDB'}
deleted_at = Column(DateTime)
deleted = Column(Boolean, default=False)
metadata = None
def delete(self, session):
"""Delete this object."""
self.deleted = True
self.deleted_at = timeutils.utcnow()
self.save(session=session)
class Service(BASE, SmaugBase):
"""Represents a running service on a host."""
__tablename__ = 'services'
id = Column(Integer, primary_key=True)
host = Column(String(255)) # , ForeignKey('hosts.id'))
binary = Column(String(255))
topic = Column(String(255))
report_count = Column(Integer, nullable=False, default=0)
disabled = Column(Boolean, default=False)
availability_zone = Column(String(255), default='smaug')
disabled_reason = Column(String(255))
# adding column modified_at to contain timestamp
# for manual enable/disable of smaug services
# updated_at column will now contain timestamps for
# periodic updates
modified_at = Column(DateTime)
def register_models():
"""Register Models and create metadata.
Called from smaug.db.sqlalchemy.__init__ as part of loading the driver,
it will never need to be called explicitly elsewhere unless the
connection is lost and needs to be reestablished.
"""
from sqlalchemy import create_engine
models = (Service,)
engine = create_engine(CONF.database.connection, echo=False)
for model in models:
model.metadata.create_all(engine)

View File

@ -169,3 +169,11 @@ class InvalidContentType(Invalid):
class PasteAppNotFound(NotFound):
message = _("Could not load paste app '%(name)s' from %(path)s")
class ServiceNotFound(NotFound):
message = _("Service %(service_id)s could not be found.")
class HostBinaryNotFound(NotFound):
message = _("Could not find binary %(binary)s on host %(host)s.")

112
smaug/manager.py Normal file
View File

@ -0,0 +1,112 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Base Manager class.
Managers are responsible for a certain aspect of the system. It is a logical
grouping of code relating to a portion of the system. In general other
components should be using the manager to make changes to the components that
it is responsible for.
We have adopted a basic strategy of Smart managers and dumb data, which means
rather than attaching methods to data objects, components should call manager
methods that act on the data.
Methods on managers that can be executed locally should be called directly. If
a particular method must execute on a remote host, this should be done via rpc
to the service that wraps the manager
Managers should be responsible for most of the db access, and
non-implementation specific data. Anything implementation specific that can't
be generalized should be done by the Driver.
Managers will often provide methods for initial setup of a host or periodic
tasks to a wrapping service.
This module provides Manager, a base class for managers.
"""
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import periodic_task
from smaug.db import base
from smaug import version
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class PeriodicTasks(periodic_task.PeriodicTasks):
def __init__(self):
super(PeriodicTasks, self).__init__(CONF)
class Manager(base.Base, PeriodicTasks):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, host=None, db_driver=None):
if not host:
host = CONF.host
self.host = host
self.additional_endpoints = []
super(Manager, self).__init__(db_driver)
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def init_host(self):
"""Handle initialization if this is a standalone service.
A hook point for services to execute tasks before the services are made
available (i.e. showing up on RPC and starting to accept RPC calls) to
other components. Child classes should override this method.
"""
pass
def init_host_with_rpc(self):
"""A hook for service to do jobs after RPC is ready.
Like init_host(), this method is a hook where services get a chance
to execute tasks that *need* RPC. Child classes should override
this method.
"""
pass
def service_version(self):
return version.version_string()
def service_config(self):
config = {}
for key in CONF:
config[key] = CONF.get(key, None)
return config
def is_working(self):
"""Method indicating if service is working correctly.
This method is supposed to be overriden by subclasses and return if
manager is working correctly.
"""
return True

View File

@ -13,23 +13,43 @@
"""Generic Node base class for all workers that run on hosts."""
import inspect
import os
import random
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import loopingcall
from oslo_service import service
from oslo_utils import importutils
from smaug import context
from smaug import db
from smaug import exception
from smaug.i18n import _
from smaug.i18n import _, _LE, _LI, _LW
from smaug import rpc
from smaug import version
from smaug.wsgi import common as wsgi_common
from smaug.wsgi import eventlet_server as wsgi
LOG = logging.getLogger(__name__)
service_opts = [
cfg.IntOpt('report_interval',
default=10,
help='Interval, in seconds, between nodes reporting state '
'to datastore'),
cfg.IntOpt('periodic_interval',
default=60,
help='Interval, in seconds, between running periodic tasks'),
cfg.IntOpt('periodic_fuzzy_delay',
default=60,
help='Range, in seconds, to randomly delay when starting the'
' periodic task scheduler to reduce stampeding.'
' (Disable by setting to 0)'),
cfg.StrOpt('osapi_smaug_listen',
default="0.0.0.0",
help='IP address on which OpenStack Smaug API listens'),
@ -44,6 +64,243 @@ CONF = cfg.CONF
CONF.register_opts(service_opts)
class Service(service.Service):
"""Service object for binaries running on hosts.
A service takes a manager and enables rpc by listening to queues based
on topic. It also periodically runs tasks on the manager and reports
it state to the database services table.
"""
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None,
service_name=None, *args, **kwargs):
super(Service, self).__init__()
if not rpc.initialized():
rpc.init(CONF)
self.host = host
self.binary = binary
self.topic = topic
self.manager_class_name = manager
manager_class = importutils.import_class(self.manager_class_name)
self.manager = manager_class(host=self.host,
service_name=service_name,
*args, **kwargs)
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.basic_config_check()
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
self.rpcserver = None
def start(self):
version_string = version.version_string()
LOG.info(_LI('Starting %(topic)s node (version %(version_string)s)'),
{'topic': self.topic, 'version_string': version_string})
self.model_disconnected = False
self.manager.init_host()
ctxt = context.get_admin_context()
try:
service_ref = db.service_get_by_args(ctxt,
self.host,
self.binary)
self.service_id = service_ref['id']
except exception.NotFound:
self._create_service_ref(ctxt)
LOG.debug("Creating RPC server for service %s", self.topic)
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
endpoints.extend(self.manager.additional_endpoints)
self.rpcserver = rpc.get_server(target, endpoints)
self.rpcserver.start()
self.manager.init_host_with_rpc()
if self.report_interval:
pulse = loopingcall.FixedIntervalLoopingCall(
self.report_state)
pulse.start(interval=self.report_interval,
initial_delay=self.report_interval)
self.timers.append(pulse)
if self.periodic_interval:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
periodic = loopingcall.FixedIntervalLoopingCall(
self.periodic_tasks)
periodic.start(interval=self.periodic_interval,
initial_delay=initial_delay)
self.timers.append(periodic)
def basic_config_check(self):
"""Perform basic config checks before starting service."""
# Make sure report interval is less than service down time
if self.report_interval:
if CONF.service_down_time <= self.report_interval:
new_down_time = int(self.report_interval * 2.5)
LOG.warning(
_LW("Report interval must be less than service down "
"time. Current config service_down_time: "
"%(service_down_time)s, report_interval for this: "
"service is: %(report_interval)s. Setting global "
"service_down_time to: %(new_down_time)s"),
{'service_down_time': CONF.service_down_time,
'report_interval': self.report_interval,
'new_down_time': new_down_time})
CONF.set_override('service_down_time', new_down_time)
def _create_service_ref(self, context):
zone = CONF.storage_availability_zone
service_ref = db.service_create(context,
{'host': self.host,
'binary': self.binary,
'topic': self.topic,
'report_count': 0,
'availability_zone': zone})
self.service_id = service_ref['id']
def __getattr__(self, key):
manager = self.__dict__.get('manager', None)
return getattr(manager, key)
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_interval=None,
periodic_fuzzy_delay=None, service_name=None):
"""Instantiates class and passes back application object.
:param host: defaults to CONF.host
:param binary: defaults to basename of executable
:param topic: defaults to bin_name - 'smaug-' part
:param manager: defaults to CONF.<topic>_manager
:param report_interval: defaults to CONF.report_interval
:param periodic_interval: defaults to CONF.periodic_interval
:param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
"""
if not host:
host = CONF.host
if not binary:
binary = os.path.basename(inspect.stack()[-1][1])
if not topic:
topic = binary
if not manager:
subtopic = topic.rpartition('smaug-')[2]
manager = CONF.get('%s_manager' % subtopic, None)
if report_interval is None:
report_interval = CONF.report_interval
if periodic_interval is None:
periodic_interval = CONF.periodic_interval
if periodic_fuzzy_delay is None:
periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
service_obj = cls(host, binary, topic, manager,
report_interval=report_interval,
periodic_interval=periodic_interval,
periodic_fuzzy_delay=periodic_fuzzy_delay,
service_name=service_name)
return service_obj
def kill(self):
"""Destroy the service object in the datastore."""
self.stop()
try:
db.service_destroy(context.get_admin_context(), self.service_id)
except exception.NotFound:
LOG.warning(_LW('Service killed that has no database entry'))
def stop(self):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.rpcserver.stop()
except Exception:
pass
for x in self.timers:
try:
x.stop()
except Exception:
pass
self.timers = []
super(Service, self).stop()
def wait(self):
for x in self.timers:
try:
x.wait()
except Exception:
pass
if self.rpcserver:
self.rpcserver.wait()
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
ctxt = context.get_admin_context()
self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
def report_state(self):
"""Update the state of this service in the datastore."""
if not self.manager.is_working():
# NOTE(dulek): If manager reports a problem we're not sending
# heartbeats - to indicate that service is actually down.
LOG.error(_LE('Manager for service %(binary)s %(host)s is '
'reporting problems, not sending heartbeat. '
'Service will appear "down".'),
{'binary': self.binary,
'host': self.host})
return
ctxt = context.get_admin_context()
zone = CONF.storage_availability_zone
state_catalog = {}
try:
try:
service_ref = db.service_get(ctxt, self.service_id)
except exception.NotFound:
LOG.debug('The service database object disappeared, '
'recreating it.')
self._create_service_ref(ctxt)
service_ref = db.service_get(ctxt, self.service_id)
state_catalog['report_count'] = service_ref['report_count'] + 1
if zone != service_ref['availability_zone']:
state_catalog['availability_zone'] = zone
db.service_update(ctxt,
self.service_id, state_catalog)
# TODO(termie): make this pattern be more elegant.
if getattr(self, 'model_disconnected', False):
self.model_disconnected = False
LOG.error(_LE('Recovered model server connection!'))
except db_exc.DBConnectionError:
if not getattr(self, 'model_disconnected', False):
self.model_disconnected = True
LOG.exception(_LE('model server went away'))
# NOTE(jsbryant) Other DB errors can happen in HA configurations.
# such errors shouldn't kill this thread, so we handle them here.
except db_exc.DBError:
if not getattr(self, 'model_disconnected', False):
self.model_disconnected = True
LOG.exception(_LE('DBError encountered: '))
except Exception:
if not getattr(self, 'model_disconnected', False):
self.model_disconnected = True
LOG.exception(_LE('Exception encountered: '))
class WSGIService(service.ServiceBase):
"""Provides ability to launch API from a 'paste' configuration."""

View File

@ -1,7 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2010-2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -13,24 +9,70 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import shutil
import fixtures
from oslo_config import cfg
from oslo_log import log
from oslotest import base
from smaug.common import config # noqa Need to register global_opts
from smaug.db import migration
from smaug.db.sqlalchemy import api as sqla_api
from smaug.tests.unit import conf_fixture
test_opts = [
]
cfg.StrOpt('sqlite_clean_db',
default='clean.sqlite',
help='File name of clean sqlite db'), ]
CONF = cfg.CONF
CONF.register_opts(test_opts)
LOG = log.getLogger(__name__)
_DB_CACHE = None
class Database(fixtures.Fixture):
def __init__(self, db_api, db_migrate, sql_connection,
sqlite_db, sqlite_clean_db):
self.sql_connection = sql_connection
self.sqlite_db = sqlite_db
self.sqlite_clean_db = sqlite_clean_db
# Suppress logging for test runs
migrate_logger = logging.getLogger('migrate')
migrate_logger.setLevel(logging.WARNING)
self.engine = db_api.get_engine()
self.engine.dispose()
conn = self.engine.connect()
db_migrate.db_sync()
if sql_connection == "sqlite://":
conn = self.engine.connect()
self._DB = "".join(line for line in conn.connection.iterdump())
self.engine.dispose()
else:
cleandb = os.path.join(CONF.state_path, sqlite_clean_db)
testdb = os.path.join(CONF.state_path, sqlite_db)
shutil.copyfile(testdb, cleandb)
def setUp(self):
super(Database, self).setUp()
if self.sql_connection == "sqlite://":
conn = self.engine.connect()
conn.connection.executescript(self._DB)
self.addCleanup(self.engine.dispose)
else:
shutil.copyfile(
os.path.join(CONF.state_path, self.sqlite_clean_db),
os.path.join(CONF.state_path, self.sqlite_db))
class TestCase(base.BaseTestCase):
@ -43,6 +85,17 @@ class TestCase(base.BaseTestCase):
conf_fixture.set_defaults(CONF)
CONF([], default_config_files=[])
CONF.set_default('connection', 'sqlite://', 'database')
CONF.set_default('sqlite_synchronous', False, 'database')
global _DB_CACHE
if not _DB_CACHE:
_DB_CACHE = Database(sqla_api, migration,
sql_connection=CONF.database.connection,
sqlite_db=CONF.database.sqlite_db,
sqlite_clean_db=CONF.sqlite_clean_db)
self.useFixture(_DB_CACHE)
self.override_config('policy_file',
os.path.join(
os.path.abspath(

View File

@ -10,7 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslo_config import cfg
@ -20,7 +20,11 @@ CONF.import_opt('policy_file', 'smaug.policy', group='oslo_policy')
def set_defaults(conf):
conf.set_default('connection', 'sqlite://', group='database')
conf.set_default('sqlite_synchronous', False, group='database')
conf.set_default('policy_file', 'smaug.tests.unit/policy.json',
group='oslo_policy')
conf.set_default('policy_dirs', [], group='oslo_policy')
conf.set_default('auth_strategy', 'noauth')
conf.set_default('state_path', os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', '..', '..')))

View File

View File

@ -0,0 +1,98 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for Models Database."""
from oslo_config import cfg
from smaug import context
from smaug import db
from smaug import exception
from smaug.tests import base
CONF = cfg.CONF
class ServicesDbTestCase(base.TestCase):
"""Test cases for Services database table."""
def setUp(self):
super(ServicesDbTestCase, self).setUp()
self.ctxt = context.RequestContext(user_id='user_id',
project_id='project_id',
is_admin=True)
def test_services_create(self):
service_ref = db.service_create(self.ctxt,
{'host': 'hosttest',
'binary': 'binarytest',
'topic': 'topictest',
'report_count': 0,
'availability_zone': 'zonetest'})
self.assertEqual(service_ref['host'], 'hosttest')
def test_services_get(self):
service_ref = db.service_create(self.ctxt,
{'host': 'hosttest1',
'binary': 'binarytest1',
'topic': 'topictest1',
'report_count': 0,
'availability_zone': 'zonetest1'})
service_get_ref = db.service_get(self.ctxt, service_ref['id'])
self.assertEqual(service_ref['host'], 'hosttest1')
self.assertEqual(service_get_ref['host'], 'hosttest1')
def test_service_destroy(self):
service_ref = db.service_create(self.ctxt,
{'host': 'hosttest2',
'binary': 'binarytest2',
'topic': 'topictest2',
'report_count': 0,
'availability_zone': 'zonetest2'})
service_id = service_ref['id']
db.service_destroy(self.ctxt, service_id)
self.assertRaises(exception.ServiceNotFound, db.service_get,
self.ctxt, service_id)
def test_service_update(self):
service_ref = db.service_create(self.ctxt,
{'host': 'hosttest3',
'binary': 'binarytest3',
'topic': 'topictest3',
'report_count': 0,
'availability_zone': 'zonetest3'})
service_id = service_ref['id']
service_update_ref = db.service_update(self.ctxt, service_id,
{'host': 'hosttest4',
'binary': 'binarytest4',
'topic': 'topictest4',
'report_count': 0,
'availability_zone':
'zonetest4'})
self.assertEqual(service_ref['host'], 'hosttest3')
self.assertEqual(service_update_ref['host'], 'hosttest4')
def test_service_get_by_host_and_topic(self):
service_ref = db.service_create(self.ctxt,
{'host': 'hosttest5',
'binary': 'binarytest5',
'topic': 'topictest5',
'report_count': 0,
'availability_zone': 'zonetest5'})
service_get_ref = db.service_get_by_host_and_topic(self.ctxt,
'hosttest5',
'topictest5')
self.assertEqual(service_ref['host'], 'hosttest5')
self.assertEqual(service_get_ref['host'], 'hosttest5')

View File

@ -17,14 +17,220 @@ Unit Tests for remote procedure calls using queue
import mock
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_db import exception as db_exc
from smaug import context
from smaug import db
from smaug import exception
from smaug import manager
from smaug import rpc
from smaug import service
from smaug.tests import base
from smaug.wsgi import common as wsgi
test_service_opts = [
cfg.StrOpt("fake_manager",
default="smaug.tests.unit.test_service.FakeManager",
help="Manager for testing"), ]
CONF = cfg.CONF
CONF.register_opts(test_service_opts)
class FakeManager(manager.Manager):
"""Fake manager for tests."""
def __init__(self, host=None,
db_driver=None, service_name=None):
super(FakeManager, self).__init__(host=host,
db_driver=db_driver)
def test_method(self):
return 'manager'
class ExtendedService(service.Service):
def test_method(self):
return 'service'
class ServiceManagerTestCase(base.TestCase):
"""Test cases for Services."""
def test_message_gets_to_manager(self):
serv = service.Service('test',
'test',
'test',
'smaug.tests.unit.test_service.FakeManager')
serv.start()
self.assertEqual('manager', serv.test_method())
def test_override_manager_method(self):
serv = ExtendedService('test',
'test',
'test',
'smaug.tests.unit.test_service.FakeManager')
serv.start()
self.assertEqual('service', serv.test_method())
class ServiceFlagsTestCase(base.TestCase):
def test_service_enabled_on_create_based_on_flag(self):
self.flags(enable_new_services=True)
host = 'foo'
binary = 'smaug-fake'
app = service.Service.create(host=host, binary=binary)
app.start()
app.stop()
ref = db.service_get(context.get_admin_context(), app.service_id)
db.service_destroy(context.get_admin_context(), app.service_id)
self.assertFalse(ref['disabled'])
def test_service_disabled_on_create_based_on_flag(self):
self.flags(enable_new_services=False)
host = 'foo'
binary = 'smaug-fake'
app = service.Service.create(host=host, binary=binary)
app.start()
app.stop()
ref = db.service_get(context.get_admin_context(), app.service_id)
db.service_destroy(context.get_admin_context(), app.service_id)
self.assertTrue(ref['disabled'])
class ServiceTestCase(base.TestCase):
"""Test cases for Services."""
def setUp(self):
super(ServiceTestCase, self).setUp()
self.host = 'foo'
self.binary = 'smaug-fake'
self.topic = 'fake'
def test_create(self):
app = service.Service.create(host=self.host,
binary=self.binary,
topic=self.topic)
self.assertTrue(app)
def test_report_state_newly_disconnected(self):
service_ref = {'host': self.host,
'binary': self.binary,
'topic': self.topic,
'report_count': 0,
'availability_zone': 'nova',
'id': 1}
with mock.patch.object(service, 'db') as mock_db:
mock_db.service_get_by_args.side_effect = exception.NotFound()
mock_db.service_create.return_value = service_ref
mock_db.service_get.side_effect = db_exc.DBConnectionError()
serv = service.Service(
self.host,
self.binary,
self.topic,
'smaug.tests.unit.test_service.FakeManager'
)
serv.start()
serv.report_state()
self.assertTrue(serv.model_disconnected)
self.assertFalse(mock_db.service_update.called)
def test_report_state_disconnected_DBError(self):
service_ref = {'host': self.host,
'binary': self.binary,
'topic': self.topic,
'report_count': 0,
'availability_zone': 'nova',
'id': 1}
with mock.patch.object(service, 'db') as mock_db:
mock_db.service_get_by_args.side_effect = exception.NotFound()
mock_db.service_create.return_value = service_ref
mock_db.service_get.side_effect = db_exc.DBError()
serv = service.Service(
self.host,
self.binary,
self.topic,
'smaug.tests.unit.test_service.FakeManager'
)
serv.start()
serv.report_state()
self.assertTrue(serv.model_disconnected)
self.assertFalse(mock_db.service_update.called)
def test_report_state_newly_connected(self):
service_ref = {'host': self.host,
'binary': self.binary,
'topic': self.topic,
'report_count': 0,
'availability_zone': 'nova',
'id': 1}
with mock.patch.object(service, 'db') as mock_db:
mock_db.service_get_by_args.side_effect = exception.NotFound()
mock_db.service_create.return_value = service_ref
mock_db.service_get.return_value = service_ref
serv = service.Service(
self.host,
self.binary,
self.topic,
'smaug.tests.unit.test_service.FakeManager'
)
serv.start()
serv.model_disconnected = True
serv.report_state()
self.assertFalse(serv.model_disconnected)
self.assertTrue(mock_db.service_update.called)
def test_report_state_manager_not_working(self):
service_ref = {'host': self.host,
'binary': self.binary,
'topic': self.topic,
'report_count': 0,
'availability_zone': 'nova',
'id': 1}
with mock.patch('smaug.db') as mock_db:
mock_db.service_get.return_value = service_ref
serv = service.Service(
self.host,
self.binary,
self.topic,
'smaug.tests.unit.test_service.FakeManager'
)
serv.manager.is_working = mock.Mock(return_value=False)
serv.start()
serv.report_state()
serv.manager.is_working.assert_called_once_with()
self.assertFalse(mock_db.service_update.called)
def test_service_with_long_report_interval(self):
self.override_config('service_down_time', 10)
self.override_config('report_interval', 10)
service.Service.create(
binary="test_service",
manager="smaug.tests.unit.test_service.FakeManager")
self.assertEqual(25, CONF.service_down_time)
@mock.patch.object(rpc, 'get_server')
@mock.patch('smaug.db')
def test_service_stop_waits_for_rpcserver(self, mock_db, mock_rpc):
serv = service.Service(
self.host,
self.binary,
self.topic,
'smaug.tests.unit.test_service.FakeManager'
)
serv.start()
serv.stop()
serv.wait()
serv.rpcserver.start.assert_called_once_with()
serv.rpcserver.stop.assert_called_once_with()
serv.rpcserver.wait.assert_called_once_with()
class TestWSGIService(base.TestCase):

View File

@ -15,6 +15,7 @@ import os
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
import six
from smaug import exception
@ -68,3 +69,12 @@ def check_string_length(value, name, min_length=0, max_length=None):
msg = _("%(name)s has more than %(max_length)s "
"characters.") % {'name': name, 'max_length': max_length}
raise exception.InvalidInput(message=msg)
def service_is_up(service):
"""Check whether a service is up based on last heartbeat."""
last_heartbeat = service['updated_at'] or service['created_at']
elapsed = (timeutils.utcnow(with_timezone=False) -
last_heartbeat).total_seconds()
return abs(elapsed) <= CONF.service_down_time