Add db model and api for worker

The db model is called 'Worker', it's used to store the rally worker
service status. And the following db apis are added:
 * register_worker
 * get_worker
 * unregister_worker
 * update_worker

Change-Id: I38c792cc133dbace10eb7f2d4d202ca05ec7f9cc
This commit is contained in:
liyingjun
2014-06-24 22:22:31 +08:00
parent d3b93b8e1f
commit 37e502bd77
5 changed files with 131 additions and 0 deletions

View File

@@ -317,3 +317,45 @@ def verification_result_create(verification_uuid, values):
:returns: TaskResult instance appended. :returns: TaskResult instance appended.
""" """
return IMPL.verification_result_create(verification_uuid, values) return IMPL.verification_result_create(verification_uuid, values)
def register_worker(values):
"""Register a new worker service at the specified hostname.
:param values: A dict of values which must contain the following:
{
'hostname': the unique hostname which identifies
this worker service.
}
:returns: A worker.
:raises: WorkerAlreadyRegistered
"""
return IMPL.register_worker(values)
def get_worker(hostname):
"""Retrieve a worker service record from the database.
:param hostname: The hostname of the worker service.
:returns: A worker.
:raises: WorkerNotFound
"""
return IMPL.get_worker(hostname)
def unregister_worker(hostname):
"""Unregister this worker with the service registry.
:param hostname: The hostname of the worker service.
:raises: WorkerNotFound
"""
IMPL.unregister_worker(hostname)
def update_worker(hostname):
"""Mark a worker as active by updating its 'updated_at' property.
:param hostname: The hostname of this worker service.
:raises: WorkerNotFound
"""
IMPL.update_worker(hostname)

View File

@@ -17,12 +17,15 @@ SQLAlchemy implementation for DB.API
""" """
from oslo.config import cfg from oslo.config import cfg
from oslo.db import exception as db_exc
from oslo.db.sqlalchemy import session as db_session from oslo.db.sqlalchemy import session as db_session
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy.orm.exc import NoResultFound
from rally.db.sqlalchemy import models from rally.db.sqlalchemy import models
from rally import exceptions from rally import exceptions
from rally.openstack.common.gettextutils import _ from rally.openstack.common.gettextutils import _
from rally.openstack.common import timeutils
CONF = cfg.CONF CONF = cfg.CONF
@@ -281,3 +284,34 @@ class Connection(object):
raise exceptions.NotFoundException( raise exceptions.NotFoundException(
"No results for following UUID '%s'." % verification_uuid) "No results for following UUID '%s'." % verification_uuid)
return result return result
def register_worker(self, values):
try:
worker = models.Worker()
worker.update(values)
worker.update({'updated_at': timeutils.utcnow()})
worker.save()
return worker
except db_exc.DBDuplicateEntry:
raise exceptions.WorkerAlreadyRegistered(
worker=values['hostname'])
def get_worker(self, hostname):
try:
return (self.model_query(models.Worker).
filter_by(hostname=hostname).one())
except NoResultFound:
raise exceptions.WorkerNotFound(worker=hostname)
def unregister_worker(self, hostname):
count = (self.model_query(models.Worker).
filter_by(hostname=hostname).delete())
if count == 0:
raise exceptions.WorkerNotFound(worker=hostname)
def update_worker(self, hostname):
count = (self.model_query(models.Worker).
filter_by(hostname=hostname).
update({'updated_at': timeutils.utcnow()}))
if count == 0:
raise exceptions.WorkerNotFound(worker=hostname)

View File

@@ -21,6 +21,7 @@ import uuid
from oslo.db.sqlalchemy import models from oslo.db.sqlalchemy import models
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import schema
from sqlalchemy import types from sqlalchemy import types
from rally import consts from rally import consts
@@ -220,6 +221,15 @@ class VerificationResult(BASE, RallyBase):
data = sa.Column(sa_types.BigMutableJSONEncodedDict, nullable=False) data = sa.Column(sa_types.BigMutableJSONEncodedDict, nullable=False)
class Worker(BASE, RallyBase):
__tablename__ = "workers"
__table_args__ = (
schema.UniqueConstraint('hostname', name='uniq_worker@hostname'),
)
id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
hostname = sa.Column(sa.String(255))
def create_db(): def create_db():
from rally.db.sqlalchemy import api as sa_api from rally.db.sqlalchemy import api as sa_api

View File

@@ -255,3 +255,11 @@ class ImageCleanUpException(RallyException):
class IncompatiblePythonVersion(RallyException): class IncompatiblePythonVersion(RallyException):
msg_fmt = _("Incompatible python version found '%(version)s', " msg_fmt = _("Incompatible python version found '%(version)s', "
"required at least python>=2.7.x") "required at least python>=2.7.x")
class WorkerNotFound(NotFoundException):
msg_fmt = _("Worker %(worker)s could not be found")
class WorkerAlreadyRegistered(RallyException):
msg_fmt = _("Worker %(worker)s already registered")

View File

@@ -418,3 +418,40 @@ class VerificationTestCase(test.DBTestCase):
self.assertEqual(verification['time'], db_verification['time']) self.assertEqual(verification['time'], db_verification['time'])
self.assertEqual(verification['errors'], db_verification['errors']) self.assertEqual(verification['errors'], db_verification['errors'])
self.assertEqual(verification['failures'], db_verification['failures']) self.assertEqual(verification['failures'], db_verification['failures'])
class WorkerTestCase(test.DBTestCase):
def setUp(self):
super(WorkerTestCase, self).setUp()
self.worker = db.register_worker({'hostname': 'test'})
def test_register_worker_duplicate(self):
self.assertRaises(exceptions.WorkerAlreadyRegistered,
db.register_worker, {'hostname': 'test'})
def test_get_worker(self):
worker = db.get_worker('test')
self.assertEqual(self.worker['id'], worker['id'])
self.assertEqual(self.worker['hostname'], worker['hostname'])
def test_get_worker_not_found(self):
self.assertRaises(exceptions.WorkerNotFound,
db.get_worker, 'notfound')
def test_unregister_worker(self):
db.unregister_worker('test')
self.assertRaises(exceptions.WorkerNotFound,
db.get_worker, 'test')
def test_unregister_worker_not_found(self):
self.assertRaises(exceptions.WorkerNotFound,
db.unregister_worker, 'fake')
def test_update_worker(self):
db.update_worker('test')
worker = db.get_worker('test')
self.assertNotEqual(self.worker['updated_at'], worker['updated_at'])
def test_update_worker_not_found(self):
self.assertRaises(exceptions.WorkerNotFound,
db.update_worker, 'fake')