Conductors maintan driver list in the DB

To facilitate exposing a list of drivers in the API,
and keep track of which ironic-conductor instance status,
this patch ...
* adds a new 'conductors' table in the DB
* adds a Conductor object class (with intentionally limited
  functionality)
* adds db/api methods for register/unregister/get/touch
* adds periodic task to conductor to maintain its updated_at field
* adds an additional db/api method to retrieve a list of drivers
  which are registered by active conductor instances

Change-Id: I1ebdb92d5c2d6ad1a6d1717dd13ff51be181ccc0
This commit is contained in:
Devananda van der Veen 2013-10-14 17:21:37 -07:00
parent 4e892a1fb3
commit d40f8e3935
15 changed files with 589 additions and 7 deletions

View File

@ -286,6 +286,14 @@ class ChassisNotFound(NotFound):
message = _("Chassis %(chassis)s could not be found.")
class ConductorNotFound(NotFound):
message = _("Conductor %(conductor)s could not be found.")
class ConductorAlreadyRegistered(IronicException):
message = _("Conductor %(conductor)s already registered.")
class PowerStateFailure(InvalidState):
message = _("Failed to set node power state to %(pstate)s.")

View File

@ -23,6 +23,7 @@ from oslo.config import cfg
from ironic.openstack.common import context
from ironic.openstack.common import log
from ironic.openstack.common import periodic_task
from ironic.openstack.common import rpc
from ironic.openstack.common.rpc import service as rpc_service
@ -41,7 +42,7 @@ cfg.CONF.register_opts([
])
class PeriodicService(rpc_service.Service):
class PeriodicService(rpc_service.Service, periodic_task.PeriodicTasks):
def start(self):
super(PeriodicService, self).start()

View File

@ -37,6 +37,7 @@ node; these locks are represented by the
:py:class:`ironic.conductor.task_manager.TaskManager` class.
"""
from ironic.common import driver_factory
from ironic.common import exception
from ironic.common import service
from ironic.common import states
@ -45,6 +46,7 @@ from ironic.db import api as dbapi
from ironic.objects import base as objects_base
from ironic.openstack.common import excutils
from ironic.openstack.common import log
from ironic.openstack.common import periodic_task
MANAGER_TOPIC = 'ironic.conductor_manager'
@ -65,6 +67,21 @@ class ConductorManager(service.PeriodicService):
super(ConductorManager, self).start()
self.dbapi = dbapi.get_instance()
df = driver_factory.DriverFactory()
drivers = df.names
try:
self.dbapi.register_conductor({'hostname': self.host,
'drivers': drivers})
except exception.ConductorAlreadyRegistered:
LOG.warn(_("A conductor with hostname %(hostname)s "
"was previously registered. Updating registration")
% {'hostname': self.host})
self.dbapi.unregister_conductor(self.host)
self.dbapi.register_conductor({'hostname': self.host,
'drivers': drivers})
# TODO(deva): add stop() to call unregister_conductor
def initialize_service_hook(self, service):
pass
@ -73,9 +90,9 @@ class ConductorManager(service.PeriodicService):
notification.get('event_type'))
# TODO(deva)
def periodic_tasks(self, context):
# TODO(deva)
pass
def periodic_tasks(self, context, raise_on_error=False):
"""Periodic tasks are run at pre-specified interval."""
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def get_node_power_state(self, context, node_id):
"""Get and return the power state for a single node."""
@ -289,3 +306,7 @@ class ConductorManager(service.PeriodicService):
else:
node_obj['provision_state'] = new_state
node_obj.save(context)
@periodic_task.periodic_task
def _conductor_service_record_keepalive(self, context):
self.dbapi.touch_conductor(self.host)

View File

@ -281,3 +281,49 @@ class Connection(object):
:param chassis: The id or the uuid of a chassis.
"""
@abc.abstractmethod
def register_conductor(self, values):
"""Register a new conductor service at the specified hostname.
:param values: A dict of values which must contain the following:
{
'hostname': the unique hostname which identifies
this Conductor service.
'drivers': a list of supported drivers.
}
:returns: A conductor.
:raises: ConductorAlreadyRegistered
"""
@abc.abstractmethod
def get_conductor(self, hostname):
"""Retrieve a conductor service record from the database.
:param hostname: The hostname of the conductor service.
:returns: A conductor.
:raises: ConductorNotFound
"""
@abc.abstractmethod
def unregister_conductor(self, hostname):
"""Unregister this conductor with the service registry.
:param hostname: The hostname of this conductor service.
:raises: ConductorNotFound
"""
@abc.abstractmethod
def touch_conductor(self, hostname):
"""Mark a conductor as active by updating its 'updated_at' property.
:param hostname: The hostname of this conductor service.
:raises: ConductorNotFound
"""
@abc.abstractmethod
def list_active_conductor_drivers(self, interval):
"""Retrieve a list of drivers supported by the registered conductors.
:param interval: Time since last check-in of a conductor.
"""

View File

@ -17,6 +17,8 @@
"""SQLAlchemy storage backend."""
import datetime
from oslo.config import cfg
# TODO(deva): import MultipleResultsFound and handle it appropriately
@ -28,9 +30,11 @@ from ironic.common import utils
from ironic.db import api
from ironic.db.sqlalchemy import models
from ironic import objects
from ironic.openstack.common.db import exception as db_exc
from ironic.openstack.common.db.sqlalchemy import session as db_session
from ironic.openstack.common.db.sqlalchemy import utils as db_utils
from ironic.openstack.common import log
from ironic.openstack.common import timeutils
from ironic.openstack.common import uuidutils
CONF = cfg.CONF
@ -464,3 +468,58 @@ class Connection(api.Connection):
count = query.delete()
if count != 1:
raise exception.ChassisNotFound(chassis=chassis)
@objects.objectify(objects.Conductor)
def register_conductor(self, values):
try:
conductor = models.Conductor()
conductor.update(values)
# NOTE(deva): ensure updated_at field has a non-null initial value
if not conductor.get('updated_at'):
conductor.update({'updated_at': timeutils.utcnow()})
conductor.save()
return conductor
except db_exc.DBDuplicateEntry:
raise exception.ConductorAlreadyRegistered(
conductor=values['hostname'])
@objects.objectify(objects.Conductor)
def get_conductor(self, hostname):
try:
return model_query(models.Conductor).\
filter_by(hostname=hostname).\
one()
except NoResultFound:
raise exception.ConductorNotFound(conductor=hostname)
def unregister_conductor(self, hostname):
session = get_session()
with session.begin():
query = model_query(models.Conductor, session=session).\
filter_by(hostname=hostname)
count = query.delete()
if count == 0:
raise exception.ConductorNotFound(conductor=hostname)
def touch_conductor(self, hostname):
session = get_session()
with session.begin():
query = model_query(models.Conductor, session=session).\
filter_by(hostname=hostname)
# since we're not changing any other field, manually set updated_at
count = query.update({'updated_at': timeutils.utcnow()})
if count == 0:
raise exception.ConductorNotFound(conductor=hostname)
def list_active_conductor_drivers(self, interval):
# TODO(deva): add configurable default 'interval', somewhere higher
# up the code. This isn't a db-specific option.
limit = timeutils.utcnow() - datetime.timedelta(seconds=interval)
result = model_query(models.Conductor).\
filter(models.Conductor.updated_at >= limit).\
all()
driver_set = set()
for row in result:
driver_set.update(set(row['drivers']))
return list(driver_set)

View File

@ -0,0 +1,56 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# -*- encoding: utf-8 -*-
#
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from migrate.changeset import UniqueConstraint
from sqlalchemy import MetaData, Table, Column, Integer, String, Text, DateTime
from ironic.openstack.common import log as logging
LOG = logging.getLogger(__name__)
ENGINE = 'InnoDB'
CHARSET = 'utf8'
def upgrade(migrate_engine):
meta = MetaData(bind=migrate_engine)
conductor = Table('conductors', meta,
Column('id', Integer, primary_key=True, nullable=False),
Column('hostname', String(length=255), nullable=False),
Column('drivers', Text),
Column('created_at', DateTime),
Column('updated_at', DateTime),
mysql_engine=ENGINE,
mysql_charset=CHARSET,
)
try:
conductor.create()
except Exception:
LOG.info(repr(conductor))
LOG.exception(_('Exception while creating table.'))
raise
uc = UniqueConstraint('hostname',
table=conductor,
name='uniq_conductors0hostname')
uc.create()
def downgrade(migrate_engine):
raise NotImplementedError(_('Downgrade from version 012 is unsupported.'))

View File

@ -93,6 +93,18 @@ class Chassis(Base):
description = Column(String(255), nullable=True)
class Conductor(Base):
"""Represents a conductor service entry."""
__tablename__ = 'conductors'
__table_args__ = (
schema.UniqueConstraint('hostname', name='uniq_conductors0hostname'),
)
id = Column(Integer, primary_key=True)
hostname = Column(String(255), nullable=False)
drivers = Column(JSONEncodedDict)
class Node(Base):
"""Represents a bare metal node."""

View File

@ -15,6 +15,7 @@
import functools
from ironic.objects import chassis
from ironic.objects import conductor
from ironic.objects import node
from ironic.objects import port
@ -35,10 +36,12 @@ def objectify(klass):
return the_decorator
Chassis = chassis.Chassis
Conductor = conductor.Conductor
Node = node.Node
Port = port.Port
__all__ = (Chassis,
Conductor,
Node,
Port,
objectify)

View File

@ -0,0 +1,69 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# coding=utf-8
#
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ironic.db import api as db_api
from ironic.objects import base
from ironic.objects import utils
class Conductor(base.IronicObject):
dbapi = db_api.get_instance()
fields = {
'id': int,
'drivers': utils.list_or_none,
'hostname': str,
}
@staticmethod
def _from_db_object(conductor, db_obj):
"""Converts a database entity to a formal object."""
for field in conductor.fields:
conductor[field] = db_obj[field]
conductor.obj_reset_changes()
return conductor
@base.remotable_classmethod
def get_by_hostname(cls, context, hostname):
"""Get a Conductor record by its hostname.
:param hostname: the hostname on which a Conductor is running
:returns: a :class:`Conductor` object.
"""
db_obj = cls.dbapi.get_conductor(hostname)
return Conductor._from_db_object(cls(), db_obj)
def save(self, context):
"""Save is not supported by Conductor objects."""
raise NotImplementedError(
_('Cannot update a conductor record directly.'))
@base.remotable
def refresh(self, context):
current = self.__class__.get_by_hostname(context,
hostname=self.hostname)
for field in self.fields:
if (hasattr(self, base.get_attrname(field)) and
self[field] != current[field]):
self[field] = current[field]
@base.remotable
def touch(self, context):
"""Touch this conductor's DB record, marking it as up-to-date."""
self.dbapi.touch_conductor(self.hostname)

View File

@ -72,6 +72,19 @@ def dict_or_none(val):
return {}
def list_or_none(val):
"""Attempt to listify a value, or None."""
if val is None:
return []
elif isinstance(val, str):
return list(ast.literal_eval(val))
else:
try:
return list(val)
except ValueError:
return []
def ip_or_none(version):
"""Return a version-specific IP address validator."""
def validator(val, version=version):

View File

@ -21,6 +21,7 @@
import mock
from ironic.common import driver_factory
from ironic.common import exception
from ironic.common import states
from ironic.conductor import manager
@ -42,6 +43,38 @@ class ManagerTestCase(base.DbTestCase):
self.dbapi = dbapi.get_instance()
self.driver = mgr_utils.get_mocked_node_manager()
def test_start_registers_conductor(self):
self.assertRaises(exception.ConductorNotFound,
self.dbapi.get_conductor,
'test-host')
self.service.start()
res = self.dbapi.get_conductor('test-host')
self.assertEqual(res['hostname'], 'test-host')
def test_start_registers_driver_names(self):
init_names = ['fake1', 'fake2']
restart_names = ['fake3', 'fake4']
df = driver_factory.DriverFactory()
with mock.patch.object(df._extension_manager, 'names') as mock_names:
# verify driver names are registered
mock_names.return_value = init_names
self.service.start()
res = self.dbapi.get_conductor('test-host')
self.assertEqual(res['drivers'], init_names)
# verify that restart registers new driver names
mock_names.return_value = restart_names
self.service.start()
res = self.dbapi.get_conductor('test-host')
self.assertEqual(res['drivers'], restart_names)
def test_periodic_keepalive(self):
self.service.start()
with mock.patch.object(self.dbapi, 'touch_conductor') as mock_touch:
self.service.periodic_tasks(self.context)
mock_touch.assert_called_once_with('test-host')
def test_get_power_state(self):
n = utils.get_test_node(driver='fake')
self.dbapi.create_node(n)

View File

@ -684,3 +684,26 @@ class TestMigrations(BaseMigrationTestCase, WalkVersionsMixin):
self.assertRaises(sqlalchemy.exc.IntegrityError,
chassis.insert().execute,
{'uuid': 'uuu-111-222', 'extra': 'extra2'})
def _check_012(self, engine, data):
self.assertTrue(engine.dialect.has_table(engine.connect(),
'conductors'))
conductor = db_utils.get_table(engine, 'conductors')
conductor_data = {'hostname': 'test-host'}
conductor.insert().values(conductor_data).execute()
self.assertRaises(sqlalchemy.exc.IntegrityError,
conductor.insert().execute,
conductor_data)
# NOTE(deva): different backends raise different error here.
if isinstance(engine.dialect,
sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite):
self.assertRaises(sqlalchemy.exc.IntegrityError,
conductor.insert().execute,
{'hostname': None})
if isinstance(engine.dialect,
sqlalchemy.dialects.mysql.pymysql.MySQLDialect_pymysql):
self.assertRaises(sqlalchemy.exc.OperationalError,
conductor.insert().execute,
{'hostname': None})
# FIXME: add check for postgres

View File

@ -0,0 +1,111 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for manipulating Conductors via the DB API"""
import datetime
from ironic.openstack.common import timeutils
from ironic.common import exception
from ironic.db import api as dbapi
from ironic.tests.db import base
from ironic.tests.db import utils
class DbConductorTestCase(base.DbTestCase):
def setUp(self):
super(DbConductorTestCase, self).setUp()
self.dbapi = dbapi.get_instance()
timeutils.set_time_override()
self.addCleanup(timeutils.clear_time_override)
def _create_test_cdr(self, **kwargs):
c = utils.get_test_conductor(**kwargs)
return self.dbapi.register_conductor(c)
def test_register_conductor(self):
self._create_test_cdr(id=1)
self.assertRaises(
exception.ConductorAlreadyRegistered,
self._create_test_cdr,
id=2)
def test_get_conductor(self):
c1 = self._create_test_cdr()
c2 = self.dbapi.get_conductor(c1['hostname'])
self.assertEqual(c1['id'], c2['id'])
def test_get_conductor_not_found(self):
self._create_test_cdr()
self.assertRaises(
exception.ConductorNotFound,
self.dbapi.get_conductor,
'bad-hostname')
def test_unregister_conductor(self):
c = self._create_test_cdr()
self.dbapi.unregister_conductor(c['hostname'])
self.assertRaises(
exception.ConductorNotFound,
self.dbapi.unregister_conductor,
c['hostname'])
def test_touch_conductor(self):
t = datetime.datetime(2000, 1, 1, 0, 0)
timeutils.set_time_override(override_time=t)
c = self._create_test_cdr(updated_at=t)
self.assertEqual(t, timeutils.normalize_time(c['updated_at']))
t = datetime.datetime(2000, 1, 1, 0, 1)
timeutils.set_time_override(override_time=t)
self.dbapi.touch_conductor(c['hostname'])
c = self.dbapi.get_conductor(c['hostname'])
self.assertEqual(t, timeutils.normalize_time(c['updated_at']))
def test_touch_conductor_not_found(self):
self._create_test_cdr()
self.assertRaises(
exception.ConductorNotFound,
self.dbapi.touch_conductor,
'bad-hostname')
def test_list_active_conductor_drivers(self):
# create some conductors with different timestamps
now = datetime.datetime(2000, 1, 1, 0, 0)
then = now + datetime.timedelta(hours=1)
d1 = [u'not-this-one']
timeutils.set_time_override(override_time=now)
self._create_test_cdr(id=1, hostname='d1', drivers=d1)
d2 = [u'foo', u'bar']
d3 = [u'another']
timeutils.set_time_override(override_time=then)
self._create_test_cdr(id=2, hostname='d2', drivers=d2)
self._create_test_cdr(id=3, hostname='d3', drivers=d3)
# verify that res contains d2 and d3, but not the old d1
res = self.dbapi.list_active_conductor_drivers(interval=60)
drivers = d2 + d3
self.assertEqual(sorted(res), sorted(drivers))
# change the interval, and verify that d1 appears
res = self.dbapi.list_active_conductor_drivers(interval=7200)
drivers = d1 + d2 + d3
self.assertEqual(sorted(res), sorted(drivers))

View File

@ -20,7 +20,6 @@ from ironic.common import states
from ironic.openstack.common import jsonutils as json
fake_info = {"foo": "bar"}
ipmi_info = json.dumps(
@ -79,8 +78,8 @@ def get_test_node(**kw):
'properties': kw.get('properties', properties),
'reservation': None,
'extra': kw.get('extra', {}),
'updated_at': None,
'created_at': None,
'updated_at': kw.get('created_at'),
'created_at': kw.get('updated_at'),
}
return node
@ -110,3 +109,15 @@ def get_test_chassis(**kw):
}
return chassis
def get_test_conductor(**kw):
conductor = {
'id': kw.get('id', 6),
'hostname': kw.get('hostname', 'test-conductor-node'),
'drivers': kw.get('drivers', ['fake-driver', 'null-driver']),
'created_at': kw.get('created_at'),
'updated_at': kw.get('updated_at'),
}
return conductor

View File

@ -0,0 +1,116 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# coding=utf-8
#
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import mock
from ironic.db import api as db_api
from ironic.db.sqlalchemy import models
from ironic import objects
from ironic.objects import utils as obj_utils
from ironic.openstack.common import timeutils
from ironic.tests.db import base
from ironic.tests.db import utils
class TestConductorObject(base.DbTestCase):
def setUp(self):
super(TestConductorObject, self).setUp()
self.fake_conductor = utils.get_test_conductor(
updated_at=timeutils.utcnow())
self.dbapi = db_api.get_instance()
def test_load(self):
host = self.fake_conductor['hostname']
with mock.patch.object(self.dbapi, 'get_conductor',
autospec=True) as mock_get_cdr:
mock_get_cdr.return_value = self.fake_conductor
objects.Conductor.get_by_hostname(self.context, host)
mock_get_cdr.assert_called_once_with(host)
def test_save(self):
host = self.fake_conductor['hostname']
with mock.patch.object(self.dbapi, 'get_conductor',
autospec=True) as mock_get_cdr:
mock_get_cdr.return_value = self.fake_conductor
c = objects.Conductor.get_by_hostname(self.context, host)
c.hostname = 'another-hostname'
self.assertRaises(NotImplementedError,
c.save, self.context)
mock_get_cdr.assert_called_once_with(host)
def test_touch(self):
host = self.fake_conductor['hostname']
with mock.patch.object(self.dbapi, 'get_conductor',
autospec=True) as mock_get_cdr:
with mock.patch.object(self.dbapi, 'touch_conductor',
autospec=True) as mock_touch_cdr:
mock_get_cdr.return_value = self.fake_conductor
c = objects.Conductor.get_by_hostname(self.context, host)
c.touch(self.context)
mock_get_cdr.assert_called_once_with(host)
mock_touch_cdr.assert_called_once_with(host)
def test_refresh(self):
host = self.fake_conductor['hostname']
t0 = self.fake_conductor['updated_at']
t1 = t0 + datetime.timedelta(seconds=10)
returns = [dict(self.fake_conductor, updated_at=t0),
dict(self.fake_conductor, updated_at=t1)]
expected = [mock.call(host), mock.call(host)]
with mock.patch.object(self.dbapi, 'get_conductor',
side_effect=returns,
autospec=True) as mock_get_cdr:
c = objects.Conductor.get_by_hostname(self.context, host)
# ensure timestamps have tzinfo
self.assertEqual(c.updated_at, obj_utils.datetime_or_none(t0))
c.refresh()
self.assertEqual(c.updated_at, obj_utils.datetime_or_none(t1))
self.assertEqual(mock_get_cdr.call_args_list, expected)
def test_objectify(self):
def _get_db_conductor():
c = models.Conductor()
c.update(self.fake_conductor)
return c
@objects.objectify(objects.Conductor)
def _convert_db_conductor():
return _get_db_conductor()
self.assertIsInstance(_get_db_conductor(), models.Conductor)
self.assertIsInstance(_convert_db_conductor(), objects.Conductor)
def test_objectify_many(self):
def _get_db_conductors():
conductors = []
for i in xrange(5):
c = models.Conductor()
c.update(self.fake_conductor)
conductors.append(c)
return conductors
@objects.objectify(objects.Conductor)
def _convert_db_conductors():
return _get_db_conductors()
for c in _get_db_conductors():
self.assertIsInstance(c, models.Conductor)
for c in _convert_db_conductors():
self.assertIsInstance(c, objects.Conductor)