SQLAlchemy 2.0 Support

Primarily remove the workaround added in
Ia6d512ff2ae417bab938cb095fbb0884d195010a which added
continued use of autocommit, which is incompatible with
SQLAlchemy 2.0.

Also set the environment for unit tests to report compatability
warnings, although it appears none are being reported at this time.

Also cuts out the db upgrade cruft to only use the online database
migration code through oslo_db's enginefacade, which has the smarts
to handle online or offline migrations.

And then, retools unit/functional test data storage to utlize sqlite,
and in that re-tooled the queries to prevent locking conditions
which could exist with queries, and some additional refactoring/cleanup.

Also, don't mock and test time.sleep().

Additionally, it looks like we have discovered the root cause of the
memory/connection leakage issue which has been observed, due to the
way lists of nodes are processed/returned.

This change was based upon the work in
I506da42a9891a245831f325e34bec92e0a3f33f0 which is included in
this commit as the entire database structure and interaction
has been modified for ironic-inspector.

Co-Authored-By: aarefiev <aarefiev@mirantis.com>
Story: 2009727
Task: 44132
Change-Id: Ic88eb9dec5fddc924a72d9a23c17a304954ebf46
This commit is contained in:
Anton Arefiev 2017-04-03 16:38:47 +03:00 committed by Julia Kreger
parent 664aa3fb4c
commit 3fe42b53fd
41 changed files with 1477 additions and 761 deletions

View File

@ -60,7 +60,7 @@ command_opt = cfg.SubCommandOpt('command',
def _get_alembic_config():
base_path = os.path.split(os.path.dirname(__file__))[0]
return alembic_config.Config(os.path.join(base_path, 'alembic.ini'))
return alembic_config.Config(os.path.join(base_path, 'db/alembic.ini'))
def do_revision(config, cmd, *args, **kwargs):
@ -85,7 +85,7 @@ def main(args=sys.argv[1:]):
CONF.register_cli_opt(command_opt)
CONF(args, project='ironic-inspector')
config = _get_alembic_config()
config.set_main_option('script_location', "ironic_inspector:migrations")
config.set_main_option('script_location', "ironic_inspector.db:migrations")
config.ironic_inspector_config = CONF
CONF.command.func(config, CONF.command.name)

View File

@ -28,7 +28,7 @@ from ironic_inspector.common import coordination
from ironic_inspector.common.i18n import _
from ironic_inspector.common import ironic as ir_utils
from ironic_inspector.common import keystone
from ironic_inspector import db
from ironic_inspector.db import api as dbapi
from ironic_inspector import introspect
from ironic_inspector import node_cache
from ironic_inspector.plugins import base as plugins_base
@ -52,6 +52,7 @@ class ConductorManager(object):
self._zeroconf = None
self._shutting_down = semaphore.Semaphore()
self.coordinator = None
self.dbapi = None
def init_host(self):
"""Initialize Worker host
@ -69,7 +70,8 @@ class ConductorManager(object):
LOG.info('Introspection data will be stored in the %s backend',
CONF.processing.store_data)
db.init()
if not self.dbapi:
self.dbapi = dbapi.init()
self.coordinator = None
try:
@ -169,6 +171,8 @@ class ConductorManager(object):
self._zeroconf.close()
self._zeroconf = None
self.dbapi = None
self._shutting_down.release()
LOG.info('Shut down successfully')

View File

@ -0,0 +1,19 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_db.sqlalchemy import enginefacade
# TODO(aarefiev): enable foreign keys for SQLite once all unit
# tests with failed constraint will be fixed.
# FIXME(stephenfin): we need to remove reliance on autocommit semantics ASAP
# since it's not compatible with SQLAlchemy 2.0
enginefacade.configure(sqlite_fk=False)

637
ironic_inspector/db/api.py Normal file
View File

@ -0,0 +1,637 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""DB models API for inspection data and shared database code."""
import threading
import time
from oslo_config import cfg
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_db import options as db_opts
from oslo_db.sqlalchemy import enginefacade
from oslo_utils import timeutils
from oslo_utils import uuidutils
from sqlalchemy import delete
from sqlalchemy import insert
from sqlalchemy import or_, and_
from sqlalchemy import orm
from sqlalchemy.orm import exc as orm_errors
from sqlalchemy import update
from ironic_inspector.common.i18n import _
from ironic_inspector.db import model
from ironic_inspector import utils
LOG = utils.getProcessingLogger(__name__)
_DEFAULT_SQL_CONNECTION = 'sqlite:///ironic_inspector.sqlite'
_CONTEXT = threading.local()
db_opts.set_defaults(cfg.CONF, connection=_DEFAULT_SQL_CONNECTION)
CONF = cfg.CONF
def init():
"""Initialize the database.
Method called on service start up, initialize transaction
context manager and try to create db session.
"""
get_writer_session()
def model_query(model, *args, **kwargs):
"""Query helper for simpler session usage.
:param session: if present, the session to use
"""
with session_for_read() as session:
query = session.query(model, *args)
return query
def get_writer_session():
"""Help method to get writer session.
:returns: The writer session.
"""
return enginefacade.writer.using(_CONTEXT)
def session_for_read():
"""Create read session within context manager"""
return enginefacade.reader.using(_CONTEXT)
def session_for_write():
"""Create write session within context manager"""
return enginefacade.writer.using(_CONTEXT)
def get_nodes():
"""Get list of cached nodes
:returns: list of nodes, could be empty
"""
with session_for_read() as session:
res = session.query(
model.Node
).order_by(
model.Node.started_at.desc()
)
return [model.Node(uuid=entry.uuid, version_id=entry.version_id,
state=entry.state, started_at=entry.started_at,
finished_at=entry.finished_at, error=entry.error,
manage_boot=entry.manage_boot)
for entry in res.all()]
def get_node(uuid, **fields):
"""Get all cached nodes
:param uuid: node uuid
:param fields: fields are used as filtering criterion
:returns: get node object
:raises: NodeNotFoundInDBError in case node not found or node
version differ from passed in fields.
"""
try:
with session_for_read() as session:
res = session.query(model.Node).filter_by(
uuid=uuid, **fields).one()
return model.Node(uuid=res.uuid, version_id=res.version_id,
state=res.state, started_at=res.started_at,
finished_at=res.finished_at, error=res.error,
manage_boot=res.manage_boot)
except (orm_errors.NoResultFound, orm_errors.StaleDataError):
raise utils.NodeNotFoundInDBError()
def get_active_nodes(started_before=None):
"""Get list of nodes on introspection
:param started_before: datetime object, returns nodes,
started before provided time
:returns: list of nodes, could be empty
"""
with session_for_read() as session:
query = session.query(model.Node).filter_by(
finished_at=None).order_by(model.Node.started_at.desc())
if started_before:
query = query.filter(model.Node.started_at < started_before)
return [model.Node(uuid=entry.uuid, version_id=entry.version_id,
state=entry.state, started_at=entry.started_at,
finished_at=entry.finished_at, error=entry.error,
manage_boot=entry.manage_boot)
for entry in query.all()]
def list_nodes_by_attributes(attributes):
"""Get list of nodes with certain attributes
:param attributes: list of attributes as (name, value) pair
:returns: list of nodes, could be empty
"""
attr_filters = []
for name, value in attributes:
attr_filters.append(and_(model.Attribute.name == name,
model.Attribute.value == value))
with session_for_read() as session:
query = session.query(
model.Attribute
).filter(or_(*attr_filters)).all()
result = [model.Attribute(uuid=attr.uuid, node_uuid=attr.node_uuid,
name=attr.name, value=attr.value)
for attr in query]
return result
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def update_node(uuid, **values):
"""Update node by uuid
Updates node fields with provided values, also bump node version.
:param uuid: node uuid
:param values: node fields with values to be updated
:raises: NodeNotFoundInDBError in case node not found or node
version differ from passed in values.
"""
fields_ver = values.copy()
with session_for_write() as session:
stmt = update(
model.Node
).where(
model.Node.uuid == uuid
).values(
fields_ver
).execution_options(
synchronize_session=False
)
res = session.execute(stmt)
if res.rowcount == 0:
raise utils.NodeNotFoundInDBError()
def create_node(uuid, state, started_at=None, finished_at=None,
error=None, manage_boot=None):
"""Create new node
:param uuid: node uuid
:param state: initial node state
:param started_at: node caching datetime
:param finished_at: introspection finished datetime
:param error: introspection error
:returns: created node object
"""
node = model.Node(uuid=uuid, state=state, started_at=started_at,
finished_at=finished_at,
error=error, manage_boot=manage_boot)
with session_for_write() as session:
session.add(node)
return node
def add_node(uuid, state, started_at=None, finished_at=None,
error=None, manage_boot=None):
"""Add new node
Before creating new node with certain uuid clean ups all existing
node info.
:param uuid: node uuid
:param state: initial node state
:param started_at: node caching datetime
:param finished_at: introspection finished datetime
:param error: introspection error
:param manage_boot: whether to manage boot for this node
:returns: created node object
"""
with session_for_write() as session:
# Delete attribute data
session.execute(
delete(model.Attribute).where(
model.Attribute.node_uuid == uuid))
# Delete introspection data
session.execute(
delete(model.Option).where(
model.Option.uuid == uuid))
session.execute(
delete(model.IntrospectionData).where(
model.IntrospectionData.uuid == uuid))
# Delete the actual node
session.execute(
delete(model.Node).where(
model.Node.uuid == uuid
).execution_options(synchronize_session=False)
)
node = model.Node(uuid=uuid, state=state, started_at=started_at,
finished_at=finished_at, error=error,
manage_boot=manage_boot)
session.add(node)
return node
def list_nodes_options_by_uuid(uuid):
"""Get list of node options
:param uuid: node uuid
:returns: list of node options, could be empty
"""
with session_for_read() as session:
query = session.query(model.Option).filter(model.Option.uuid == uuid)
return [model.Option(uuid=opt.uuid, name=opt.name, value=opt.value)
for opt in query.all()]
def delete_node(uuid):
"""Delete node and its attributes
:param uuid: node uuid
:returns: None
"""
with session_for_write() as session:
# Delete attribute data
session.execute(
delete(model.Attribute).where(
model.Attribute.node_uuid == uuid))
# Delete introspection data
session.execute(
delete(model.Option).where(
model.Option.uuid == uuid))
session.execute(
delete(model.IntrospectionData).where(
model.IntrospectionData.uuid == uuid))
# Delete the actual node
session.execute(
delete(model.Node).where(
model.Node.uuid == uuid
).execution_options(synchronize_session=False)
)
def delete_nodes(finished_until=None):
"""Delete all nodes
:param finished_until: datetime object, delete nodes are
introspected before finished_until time
:returns: None
"""
with session_for_read() as session:
query = session.query(model.Node.uuid)
if finished_until:
query = query.filter(
model.Node.finished_at.isnot(None),
model.Node.finished_at < finished_until)
uuid_list = []
for node in query.all():
# This breaks the requests up and allows proper value
# deletion since there are structural dependencies on
# for nodes in other tables. Performance wise this takes
# a little slower overall, but doesn't cause the tables to
# be locked, and handles the other tables without building
# DB triggers.
uuid_list.append(node[0])
for uuid in uuid_list:
delete_node(uuid)
# Allow the Python GIL to let something else run, and
# give the DB a chance to breath.
time.sleep(0)
def set_option(node_uuid, name, value):
"""Set option for node
:param node_uuid: node uuid
:param name: option name
:param value: option value
:returns: None
"""
with session_for_write() as session:
opt = model.Option(uuid=node_uuid, name=name, value=value)
session.add(opt)
def delete_options(**filters):
"""Delete all options
:param filters: deletion filter criteria
:returns: None
"""
with session_for_write() as session:
session.query(model.Option).filter_by(**filters).delete()
def set_attribute(node_uuid, name, values):
"""Set lookup attributes for node
:param node_uuid: node uuid
:param name: option name
:param values: list of attribute values
:returns: None
"""
if not isinstance(values, list):
values = [values]
with session_for_write() as session:
for value in values:
attr = model.Attribute(node_uuid=node_uuid,
uuid=uuidutils.generate_uuid(),
name=name, value=value)
session.add(attr)
def delete_attributes(uuid):
"""Delete all attributes
:param uuid: the UUID of the node whose attributes you wish
tod elete
:returns: None
"""
# FIXME(TheJulia): This is going to be difficult to match
# in later versions of sqlalchemy since query needs to move
# to use the object model instead of free form attribute name.
with session_for_write() as session:
session.execute(
delete(model.Attribute).where(
model.Attribute.node_uuid == uuid))
def get_attributes(order_by=None, **fields):
"""Get all attributes
:param order_by: ordering criterion
:param fields: filter criteria fields
:returns: list of attributes
"""
# FIXME(TheJulia) This needs to be rewritten
with session_for_read() as session:
query = session.query(model.Attribute).filter_by(**fields)
if order_by:
orders = [getattr(model.Attribute, key) for key in order_by]
query = query.order_by(*orders)
res = query.all()
result = [model.Attribute(uuid=attr.uuid, node_uuid=attr.node_uuid,
name=attr.name, value=attr.value)
for attr in res]
return result
def get_options(**fields):
"""Get all options
:param fields: filter criteria fields
:returns: list of options
"""
return model_query(model.Option).filter_by(**fields).all()
def create_rule(uuid, conditions, actions, description=None,
scope=None):
"""Create new rule
:param uuid: rule uuid
:param conditions: list of (field, op, multiple, invert, params) tuple,
which represents condition object
:param actions: list of (action, params) pair, which represents action
object
:param description: rule description
:param scope: rule scope
:returns: created rule
"""
try:
with session_for_write() as session:
rule = model.Rule(
uuid=uuid, description=description,
disabled=False, created_at=timeutils.utcnow(), scope=scope)
rule.conditions = rule.action = []
for field, op, multiple, invert, params in conditions:
rule.conditions.append(model.RuleCondition(op=op,
field=field,
multiple=multiple,
invert=invert,
params=params))
for action, params in actions:
rule.actions.append(model.RuleAction(action=action,
params=params))
session.add(rule)
except db_exc.DBDuplicateEntry as exc:
LOG.error('Database integrity error %s when creating a rule', exc)
raise utils.RuleUUIDExistError(uuid)
return rule
def get_rule(uuid):
"""Get rule by uuid
:param uuid: rule uuid
:returns: rule object
"""
try:
with session_for_read() as session:
query = session.query(model.Rule).where(
model.Rule.uuid == uuid)
rule = query.one()
return model.Rule(uuid=rule.uuid, created_at=rule.created_at,
description=rule.description,
disabled=rule.disabled, scope=rule.scope,
conditions=rule.conditions, actions=rule.actions)
except orm.exc.NoResultFound:
raise utils.RuleNotFoundError(uuid)
def get_rules(**fields):
"""List all rules."""
with session_for_read() as session:
query = session.query(
model.Rule
).filter_by(
**fields
).order_by(
model.Rule.created_at
)
return [model.Rule(
uuid=rule.uuid,
actions=rule.actions,
conditions=rule.conditions,
description=rule.description,
scope=rule.scope)
for rule in query]
def get_rules_conditions(**fields):
"""Get all rule conditions
:param fields: field filter criteria
:returns: list of conditions
"""
# NOTE(TheJulia): This appears to exist largely to help unit
# testing of rules funcitonality.
with session_for_read() as session:
query = session.query(
model.RuleCondition
).filter_by(**fields)
return [model.RuleCondition(
id=condition.id,
rule=condition.rule,
op=condition.op,
multiple=condition.multiple,
invert=condition.invert,
field=condition.field,
params=condition.params)
for condition in query.all()]
def get_rules_actions(**fields):
"""Get all rule actions
:param fields: field filter criteria
:returns: list of actions
"""
# NOTE(TheJulia): This appears to exist largely to help unit
# testing of rules funcitonality.
with session_for_read() as session:
query = session.query(
model.RuleAction
).filter_by(**fields)
return [model.RuleAction(
id=action.id,
rule=action.rule,
action=action.action,
params=action.params)
for action in query.all()]
def delete_rule(uuid):
"""Delete the rule by uuid
:param uuid: rule uuid
:raises: RuleNotFoundError in case rule not found
:returns: None
"""
with session_for_write() as session:
stmt = (
delete(
model.RuleAction
).where(
model.RuleAction.rule == uuid
).execution_options(synchronize_session=False)
)
session.execute(stmt)
stmt = (
delete(
model.RuleCondition
).where(
model.RuleCondition.rule == uuid
).execution_options(synchronize_session=False)
)
session.execute(stmt)
stmt = (
delete(
model.Rule
).where(
model.Rule.uuid == uuid
).execution_options(synchronize_session=False)
)
res = session.execute(stmt)
if res.rowcount == 0:
raise utils.RuleNotFoundError(uuid)
def delete_all_rules():
"""Delete all rules
:returns: None
"""
with session_for_write() as session:
session.execute(
delete(model.RuleAction).execution_options(
synchronize_session=False
)
)
session.execute(
delete(model.RuleCondition).execution_options(
synchronize_session=False
)
)
session.execute(
delete(model.Rule).execution_options(
synchronize_session=False
)
)
session.commit()
def store_introspection_data(node_id, introspection_data,
processed=True):
"""Store introspection data for this node.
:param node_id: node UUID.
:param introspection_data: A dictionary of introspection data
:param processed: Specify the type of introspected data, set to False
indicates the data is unprocessed.
"""
updated = False
with session_for_write() as session:
record = session.query(model.IntrospectionData).filter_by(
uuid=node_id, processed=processed).first()
if record:
record.update({'data': introspection_data})
updated = True
else:
# by default, all write sessions are committed. In this
# case, we can safely rollback. Once we rollback, we
# launch a new session.
session.rollback()
if not updated:
with session_for_write() as session:
stmt = insert(model.IntrospectionData).values(
{'uuid': node_id, 'processed': processed,
'data': introspection_data}
)
session.execute(stmt)
def get_introspection_data(node_id, processed=True):
"""Get introspection data for this node.
:param node_id: node UUID.
:param processed: Specify the type of introspected data, set to False
indicates retrieving the unprocessed data.
:return: A dictionary representation of intropsected data
"""
try:
with session_for_read() as session:
ref = session.query(model.IntrospectionData).filter_by(
uuid=node_id, processed=processed).one()
res = ref['data']
return res
except orm_errors.NoResultFound:
msg = _('Introspection data not found for node %(node)s, '
'processed=%(processed)s') % {'node': node_id,
'processed': processed}
raise utils.IntrospectionDataNotFound(msg)

View File

@ -0,0 +1,118 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# NOTE(TheJulia): This file was primarily taken from
# https://github.com/openstack/ironic/blob/af0e5ee096fa237290776969a37f3bced96b7456/ironic/db/sqlalchemy/migration.py
# during the Antelope cycle, however migration was last modified in 2017
# during the Queens development cycle.
# It specifically provides a clean and consistent way to invoke alembic calls.
import os
import alembic
from alembic import config as alembic_config
import alembic.migration as alembic_migration
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import enginefacade
from ironic_inspector.db import model
def _alembic_config():
path = os.path.join(os.path.dirname(__file__), 'alembic.ini')
config = alembic_config.Config(path)
return config
def version(config=None, engine=None):
"""Current database version.
:returns: Database version
:rtype: string
"""
if engine is None:
engine = enginefacade.writer.get_engine()
with engine.connect() as conn:
context = alembic_migration.MigrationContext.configure(conn)
return context.get_current_revision()
def upgrade(revision, config=None):
"""Used for upgrading database.
:param version: Desired database version
:type version: string
"""
revision = revision or 'head'
config = config or _alembic_config()
alembic.command.upgrade(config, revision or 'head')
def create_schema(config=None, engine=None):
"""Create database schema from models description.
Can be used for initial installation instead of upgrade('head').
"""
if engine is None:
engine = enginefacade.writer.get_engine()
# NOTE(viktors): If we will use metadata.create_all() for non empty db
# schema, it will only add the new tables, but leave
# existing as is. So we should avoid of this situation.
if version(engine=engine) is not None:
raise db_exc.DBMigrationError("DB schema is already under version"
" control. Use upgrade() instead")
model.Base.metadata.create_all(engine)
stamp('head', config=config)
def downgrade(revision, config=None):
"""Used for downgrading database.
:param version: Desired database version
:type version: string
"""
revision = revision or 'base'
config = config or _alembic_config()
return alembic.command.downgrade(config, revision)
def stamp(revision, config=None):
"""Stamps database with provided revision.
Don't run any migrations.
:param revision: Should match one from repository or head - to stamp
database with most recent revision
:type revision: string
"""
config = config or _alembic_config()
return alembic.command.stamp(config, revision=revision)
def revision(message=None, autogenerate=False, config=None):
"""Creates template for migration.
:param message: Text that will be used for migration title
:type message: string
:param autogenerate: If True - generates diff based on current database
state
:type autogenerate: bool
"""
config = config or _alembic_config()
return alembic.command.revision(config, message=message,
autogenerate=autogenerate)

View File

@ -15,13 +15,13 @@
from logging.config import fileConfig
from alembic import context
from oslo_db.sqlalchemy import enginefacade
from ironic_inspector import db
from ironic_inspector.db import model
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
ironic_inspector_config = config.ironic_inspector_config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
@ -31,7 +31,7 @@ fileConfig(config.config_file_name)
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = db.Base.metadata
target_metadata = model.Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
@ -39,26 +39,6 @@ target_metadata = db.Base.metadata
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = ironic_inspector_config.database.connection
context.configure(
url=url, target_metadata=target_metadata, literal_binds=True)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
@ -66,18 +46,14 @@ def run_migrations_online():
and associate a connection with the context.
"""
session = db.get_writer_session()
with session.connection() as connection:
engine = enginefacade.writer.get_engine()
with engine.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
run_migrations_online()

View File

@ -0,0 +1,59 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Introducing the aborting state
Revision ID: 18440d0834af
Revises: 882b2d84cb1b
Create Date: 2017-12-11 15:40:13.905554
"""
# from alembic import op
# import sqlalchemy as sa
# from sqlalchemy import sql
# from ironic_inspector import introspection_state as istate
# revision identifiers, used by Alembic.
revision = '18440d0834af'
down_revision = '882b2d84cb1b'
branch_labels = None
depends_on = None
# old_state = sa.Enum(*(set(istate.States.all()) - {istate.States.aborting}),
# name='node_state')
# new_state = sa.Enum(*istate.States.all(), name='node_state')
# Node = sql.table('nodes', sql.column('state', old_state))
def upgrade():
# NOTE(TheJulia): Batch table alteration generally works by making
# a copy of the table, modifying the copy, and dropping the original,
# and renaming the updated copy to be the same name. Unfortunately
# this results in a "FOREIGN KEY constraint failed" error in
# newer versions of inspector as two additional tables now exist
# which did not exist when the upgrade was written.
#
# with op.batch_alter_table('nodes') as batch_op:
# batch_op.alter_column('state', existing_type=old_state,
# type_=new_state)
#
# The prior net effect was that the field was being altered to
# be varchar(10), to house the largest enum value, except the added
# value only requres varchar(9), so this is sort of entirely
# redundant at this point. For what it is worth, while
# batch_alter_table *does* include an option to prevent
# recration, column widths cannot be modified dynamically with
# sqllite, which results in a different exception if attempted.
pass

View File

@ -15,12 +15,7 @@
"""SQLAlchemy models for inspection data and shared database code."""
import contextlib
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_db import options as db_opts
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import models
from oslo_db.sqlalchemy import types as db_types
from sqlalchemy import (Boolean, Column, DateTime, Enum, ForeignKey,
@ -28,7 +23,6 @@ from sqlalchemy import (Boolean, Column, DateTime, Enum, ForeignKey,
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import orm
from ironic_inspector import introspection_state as istate
@ -38,18 +32,15 @@ class ModelBase(models.ModelBase):
Base = declarative_base(cls=ModelBase)
CONF = cfg.CONF
_DEFAULT_SQL_CONNECTION = 'sqlite:///ironic_inspector.sqlite'
_CTX_MANAGER = None
db_opts.set_defaults(CONF, connection=_DEFAULT_SQL_CONNECTION)
_synchronized = lockutils.synchronized_with_prefix("ironic-inspector-")
class Node(Base):
__tablename__ = 'nodes'
uuid = Column(String(36), primary_key=True)
# NOTE(TheJulia): Version ID use has been removed from inspector.
# At some point, we can safely remove the column, but most likely
# the project will encourage migration to a combined service as
# opposed to a standalone service.
version_id = Column(String(36), server_default='')
state = Column(Enum(*istate.States.all()), nullable=False,
default=istate.States.finished,
@ -59,14 +50,6 @@ class Node(Base):
error = Column(Text, nullable=True)
manage_boot = Column(Boolean, nullable=True, default=True)
# version_id is being tracked in the NodeInfo object
# for the sake of consistency. See also SQLAlchemy docs:
# http://docs.sqlalchemy.org/en/latest/orm/versioning.html
__mapper_args__ = {
'version_id_col': version_id,
'version_id_generator': False,
}
class Attribute(Base):
__tablename__ = 'attributes'
@ -141,69 +124,3 @@ class IntrospectionData(Base):
processed = Column(Boolean, default=False, primary_key=True)
data = Column(db_types.JsonEncodedDict(mysql_as_long=True),
nullable=True)
def init():
"""Initialize the database.
Method called on service start up, initialize transaction
context manager and try to create db session.
"""
get_writer_session()
def model_query(model, *args, **kwargs):
"""Query helper for simpler session usage.
:param session: if present, the session to use
"""
session = kwargs.get('session') or get_reader_session()
query = session.query(model, *args)
return query
@contextlib.contextmanager
def ensure_transaction(session=None):
session = session or get_writer_session()
with session.begin(subtransactions=True):
yield session
@_synchronized("transaction-context-manager")
def _create_context_manager():
_ctx_mgr = enginefacade.transaction_context()
# TODO(aarefiev): enable foreign keys for SQLite once all unit
# tests with failed constraint will be fixed.
# FIXME(dtantsur): we need to remove reliance on autocommit semantics ASAP
# since it's not compatible with SQLAlchemy 2.0
_ctx_mgr.configure(sqlite_fk=False, __autocommit=True)
return _ctx_mgr
def get_context_manager():
"""Create transaction context manager lazily.
:returns: The transaction context manager.
"""
global _CTX_MANAGER
if _CTX_MANAGER is None:
_CTX_MANAGER = _create_context_manager()
return _CTX_MANAGER
def get_reader_session():
"""Help method to get reader session.
:returns: The reader session.
"""
return get_context_manager().reader.get_sessionmaker()()
def get_writer_session():
"""Help method to get writer session.
:returns: The writer session.
"""
return get_context_manager().writer.get_sessionmaker()()

View File

@ -1,43 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Introducing the aborting state
Revision ID: 18440d0834af
Revises: 882b2d84cb1b
Create Date: 2017-12-11 15:40:13.905554
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import sql
from ironic_inspector import introspection_state as istate
# revision identifiers, used by Alembic.
revision = '18440d0834af'
down_revision = '882b2d84cb1b'
branch_labels = None
depends_on = None
old_state = sa.Enum(*(set(istate.States.all()) - {istate.States.aborting}),
name='node_state')
new_state = sa.Enum(*istate.States.all(), name='node_state')
Node = sql.table('nodes', sql.column('state', old_state))
def upgrade():
with op.batch_alter_table('nodes') as batch_op:
batch_op.alter_column('state', existing_type=old_state,
type_=new_state)

View File

@ -19,7 +19,6 @@ import copy
import datetime
import functools
import json
import operator
from automaton import exceptions as automaton_errors
from openstack import exceptions as os_exc
@ -29,12 +28,13 @@ from oslo_utils import excutils
from oslo_utils import reflection
from oslo_utils import timeutils
from oslo_utils import uuidutils
from sqlalchemy.orm import exc as orm_errors
from sqlalchemy import delete
from ironic_inspector.common.i18n import _
from ironic_inspector.common import ironic as ir_utils
from ironic_inspector.common import locking
from ironic_inspector import db
from ironic_inspector.db import api as db
from ironic_inspector.db import model as db_model
from ironic_inspector import introspection_state as istate
from ironic_inspector import utils
@ -60,6 +60,9 @@ class NodeInfo(object):
self.finished_at = finished_at
self.error = error
self.invalidate_cache()
# NOTE(TheJulia): version_id is unused at this time and can be
# removed at a later point in time. Primarily it remains for
# compatability.
self._version_id = version_id
self._state = state
self._node = node
@ -75,6 +78,7 @@ class NodeInfo(object):
self._lock = locking.get_lock(uuid)
# Whether lock was acquired using this NodeInfo object
self._fsm = None
self._options = None
def __del__(self):
if self._lock.is_locked():
@ -123,41 +127,28 @@ class NodeInfo(object):
@property
def version_id(self):
"""Get the version id"""
"""Deprecated - Get the version id"""
if self._version_id is None:
row = db.model_query(db.Node).get(self.uuid)
if row is None:
try:
row = db.get_node(self.uuid)
except utils.NodeNotFoundInDBError:
raise utils.NotFoundInCacheError(_('Node not found in the '
'cache'), node_info=self)
self._version_id = row.version_id
return self._version_id
def _set_version_id(self, value, session):
row = self._row(session)
row.version_id = value
row.save(session)
self._version_id = value
def _row(self, session=None):
"""Get a row from the database with self.uuid and self.version_id"""
try:
# race condition if version_id changed outside of this node_info
return db.model_query(db.Node, session=session).filter_by(
uuid=self.uuid, version_id=self.version_id).one()
except (orm_errors.NoResultFound, orm_errors.StaleDataError):
raise utils.NodeStateRaceCondition(node_info=self)
def _commit(self, **fields):
"""Commit the fields into the DB."""
LOG.debug('Committing fields: %s', fields, node_info=self)
with db.ensure_transaction() as session:
self._set_version_id(uuidutils.generate_uuid(), session)
row = self._row(session)
row.update(fields)
try:
db.update_node(self.uuid,
**fields)
except utils.NodeNotFoundInDBError:
raise utils.NodeStateRaceCondition(node_info=self)
def commit(self):
"""Commit current node status into the database."""
# state and version_id are updated separately
self._commit(started_at=self.started_at, finished_at=self.finished_at,
error=self.error)
@ -165,7 +156,7 @@ class NodeInfo(object):
def state(self):
"""State of the node_info object."""
if self._state is None:
row = self._row()
row = db.get_node(self.uuid)
self._state = row.state
return self._state
@ -183,6 +174,7 @@ class NodeInfo(object):
@contextlib.contextmanager
def _fsm_ctx(self):
fsm = self._get_fsm()
try:
yield fsm
finally:
@ -203,6 +195,7 @@ class NodeInfo(object):
:strict: whether to fail the introspection upon an invalid event
:raises: NodeStateInvalidEvent
"""
with self._fsm_ctx() as fsm:
LOG.debug('Executing fsm(%(state)s).process_event(%(event)s)',
{'state': fsm.current_state, 'event': event},
@ -223,8 +216,7 @@ class NodeInfo(object):
def options(self):
"""Node introspection options as a dict."""
if self._options is None:
rows = db.model_query(db.Option).filter_by(
uuid=self.uuid)
rows = db.list_nodes_options_by_uuid(self.uuid)
self._options = {row.name: json.loads(row.value)
for row in rows}
return self._options
@ -234,8 +226,7 @@ class NodeInfo(object):
"""Node look up attributes as a dict."""
if self._attributes is None:
self._attributes = {}
rows = db.model_query(db.Attribute).filter_by(
node_uuid=self.uuid)
rows = db.get_attributes(node_uuid=self.uuid)
for row in rows:
self._attributes.setdefault(row.name, []).append(row.value)
return self._attributes
@ -256,11 +247,13 @@ class NodeInfo(object):
"""Set an option for a node."""
encoded = json.dumps(value)
self.options[name] = value
with db.ensure_transaction() as session:
db.model_query(db.Option, session=session).filter_by(
with db.session_for_write() as session:
# NOTE(TheJulia): This needs to move to the DB API at some
# point in the future.
session.query(db_model.Option).filter_by(
uuid=self.uuid, name=name).delete()
db.Option(uuid=self.uuid, name=name, value=encoded).save(
session)
opt = db_model.Option(uuid=self.uuid, name=name, value=encoded)
session.add(opt)
def finished(self, event, error=None):
"""Record status for this node and process a terminal transition.
@ -274,31 +267,24 @@ class NodeInfo(object):
self.release_lock()
self.finished_at = timeutils.utcnow()
self.error = error
db.delete_attributes(self.uuid)
db.delete_options(uuid=self.uuid)
self.fsm_event(event)
self._commit(finished_at=self.finished_at,
error=self.error)
with db.ensure_transaction() as session:
self.fsm_event(event)
self._commit(finished_at=self.finished_at, error=self.error)
db.model_query(db.Attribute, session=session).filter_by(
node_uuid=self.uuid).delete()
db.model_query(db.Option, session=session).filter_by(
uuid=self.uuid).delete()
def add_attribute(self, name, value, session=None):
def add_attribute(self, name, value):
"""Store look up attribute for a node in the database.
:param name: attribute name
:param value: attribute value or list of possible values
:param session: optional existing database session
"""
if not isinstance(value, list):
value = [value]
with db.ensure_transaction(session) as session:
for v in value:
db.Attribute(uuid=uuidutils.generate_uuid(), name=name,
value=v, node_uuid=self.uuid).save(session)
# Invalidate attributes so they're loaded on next usage
self._attributes = None
db.set_attribute(node_uuid=self.uuid, name=name, values=value)
# Invalidate attributes so they're loaded on next usage
self._attributes = None
@classmethod
def from_row(cls, row, ironic=None, node=None):
@ -674,25 +660,27 @@ def start_introspection(uuid, **kwargs):
node_info cache and the DB
:returns: NodeInfo
"""
with db.ensure_transaction():
node_info = NodeInfo(uuid)
# check that the start transition is possible
try:
node_info.fsm_event(istate.Events.start)
except utils.NotFoundInCacheError:
# node not found while in the fsm_event handler
LOG.debug('Node missing in the cache; adding it now',
node_info=node_info)
node_info = NodeInfo(uuid)
# check that the start transition is possible
try:
node_info.fsm_event(istate.Events.start)
except (utils.NotFoundInCacheError, utils.NodeNotFoundInDBError):
# node not found while in the fsm_event handler
LOG.debug('Node missing in the cache; adding it now',
node_info=node_info)
state = istate.States.starting
# Or... not found in db error when in cache but when the state
# is populated, a NodeNotFoundInDBError is raised.
else:
recorded_state = node_info.state
if istate.States.error == recorded_state:
# If there was a failure, return to starting state to avoid
# letting the cache block new runs from occuring.
state = istate.States.starting
else:
recorded_state = node_info.state
if istate.States.error == recorded_state:
# If there was a failure, return to starting state to avoid
# letting the cache block new runs from occuring.
state = istate.States.starting
else:
state = recorded_state
return add_node(uuid, state, **kwargs)
state = recorded_state
return add_node(uuid, state, **kwargs)
def add_node(uuid, state, manage_boot=True, **attributes):
@ -709,20 +697,39 @@ def add_node(uuid, state, manage_boot=True, **attributes):
:returns: NodeInfo
"""
started_at = timeutils.utcnow()
with db.ensure_transaction() as session:
_delete_node(uuid)
version_id = uuidutils.generate_uuid()
db.Node(uuid=uuid, state=state, version_id=version_id,
started_at=started_at, manage_boot=manage_boot).save(session)
with db.session_for_write() as session:
# TODO(TheJulia): This needs ot be moved to the DBAPI, but for change
# reviewer sanity, is here for now.
session.execute(
delete(db_model.Attribute).where(
db_model.Attribute.node_uuid == uuid))
# Delete introspection data
session.execute(
delete(db_model.Option).where(
db_model.Option.uuid == uuid))
session.execute(
delete(db_model.IntrospectionData).where(
db_model.IntrospectionData.uuid == uuid))
# Delete the actual node
session.execute(
delete(db_model.Node).where(
db_model.Node.uuid == uuid
).execution_options(synchronize_session=False)
)
node = db_model.Node(uuid=uuid, state=state, started_at=started_at,
finished_at=None, error=None,
manage_boot=manage_boot)
session.add(node)
node_info = NodeInfo(uuid=uuid, state=state, started_at=started_at,
version_id=version_id, manage_boot=manage_boot,
ironic=attributes.pop('ironic', None))
for (name, value) in attributes.items():
if not value:
continue
node_info.add_attribute(name, value, session=session)
node_info = NodeInfo(uuid=uuid, state=state,
started_at=started_at,
ironic=attributes.pop('ironic', None),
manage_boot=manage_boot)
for (name, value) in attributes.items():
if not value:
continue
node_info.add_attribute(name, value)
return node_info
@ -736,35 +743,18 @@ def delete_nodes_not_in_list(uuids):
LOG.warning('Node %s was deleted from Ironic, dropping from Ironic '
'Inspector database', uuid)
with locking.get_lock(uuid):
_delete_node(uuid)
def _delete_node(uuid, session=None):
"""Delete information about a node.
:param uuid: Ironic node UUID
:param session: optional existing database session
"""
with db.ensure_transaction(session) as session:
db.model_query(db.Attribute, session=session).filter_by(
node_uuid=uuid).delete()
for model in (db.Option, db.IntrospectionData, db.Node):
db.model_query(model,
session=session).filter_by(uuid=uuid).delete()
db.delete_node(uuid=uuid)
def introspection_active():
"""Check if introspection is active for at least one node."""
# FIXME(dtantsur): is there a better way to express it?
return (db.model_query(db.Node.uuid).filter_by(finished_at=None).first()
is not None)
return bool(db.get_active_nodes())
def active_macs():
"""List all MAC's that are on introspection right now."""
query = (db.model_query(db.Attribute.value).join(db.Node)
.filter(db.Attribute.name == MACS_ATTRIBUTE))
return {x.value for x in query}
return {x.value for x in db.get_attributes(name=MACS_ATTRIBUTE)}
def _list_node_uuids():
@ -772,7 +762,7 @@ def _list_node_uuids():
:returns: Set of nodes' uuid.
"""
return {x.uuid for x in db.model_query(db.Node.uuid)}
return {x.uuid for x in db.get_nodes()}
def get_node(node_id, ironic=None):
@ -789,11 +779,12 @@ def get_node(node_id, ironic=None):
node = ir_utils.get_node(node_id, ironic=ironic)
uuid = node.id
row = db.model_query(db.Node).filter_by(uuid=uuid).first()
if row is None:
try:
row = db.get_node(uuid)
return NodeInfo.from_row(row, ironic=ironic, node=node)
except utils.NodeNotFoundInDBError:
raise utils.Error(_('Could not find node %s in cache') % uuid,
code=404)
return NodeInfo.from_row(row, ironic=ironic, node=node)
def find_node(**attributes):
@ -820,11 +811,9 @@ def find_node(**attributes):
LOG.debug('Trying to use %s of value %s for node look up',
name, value)
query = db.model_query(db.Attribute.node_uuid)
pairs = [(db.Attribute.name == name) &
(db.Attribute.value == v) for v in value]
query = query.filter(functools.reduce(operator.or_, pairs))
found.update(row.node_uuid for row in query.distinct().all())
attr_list = [(name, v) for v in value]
rows = db.list_nodes_by_attributes(attr_list)
found.update(row.node_uuid for row in rows)
if not found:
raise utils.NotFoundInCacheError(_(
@ -849,23 +838,20 @@ def find_node(**attributes):
uuid = found.pop()
node_info = NodeInfo(uuid=uuid, ironic=ironic)
node_info.acquire_lock()
try:
row = (db.model_query(db.Node.started_at, db.Node.finished_at).
filter_by(uuid=uuid).first())
if not row:
raise utils.Error(_(
'Could not find node %s in introspection cache, '
'probably it\'s not on introspection now') % uuid, code=404)
row = db.get_node(uuid)
if row.finished_at:
raise utils.Error(_(
'Introspection for node %(node)s already finished on '
'%(finish)s') % {'node': uuid, 'finish': row.finished_at})
# set the started_at field before returning so the caller
# has the data.
node_info.started_at = row.started_at
return node_info
except utils.NodeNotFoundInDBError:
raise utils.Error(_(
'Could not find node %s in introspection cache, '
'probably it\'s not on introspection now') % uuid, code=404)
except Exception:
with excutils.save_and_reraise_exception():
node_info.release_lock()
@ -882,11 +868,9 @@ def clean_up():
if timeout <= 0:
return []
threshold = timeutils.utcnow() - datetime.timedelta(seconds=timeout)
uuids = [row.uuid for row in
db.model_query(db.Node.uuid).filter(
db.Node.started_at < threshold,
db.Node.finished_at.is_(None)).all()]
uuids = [row.uuid for row in
db.get_active_nodes(started_before=threshold)]
if not uuids:
return []
@ -989,20 +973,29 @@ def get_node_list(ironic=None, marker=None, limit=None, state=None):
:returns: a list of NodeInfo instances.
"""
if marker is not None:
# uuid marker -> row marker for pagination
marker = db.model_query(db.Node).get(marker)
# Get the marker using the DB API as it closes the connection and
# does *not* orphan node data in memory.
marker = db.get_node(marker)
if marker is None:
raise utils.Error(_('Node not found for marker: %s') % marker,
code=404)
rows = db.model_query(db.Node)
if state:
rows = rows.filter(db.Node.state.in_(state))
# ordered based on (started_at, uuid); newer first
rows = db_utils.paginate_query(rows, db.Node, limit,
('started_at', 'uuid'),
marker=marker, sort_dir='desc')
return [NodeInfo.from_row(row, ironic=ironic) for row in rows]
with db.session_for_read() as session:
# TODO(TheJulia): This should be moved to the DB API, and out of the
# node cache code.
rows = session.query(db_model.Node)
if state:
rows = rows.filter(db_model.Node.state.in_(state))
# ordered based on (started_at, uuid); newer first
rows = db_utils.paginate_query(rows, db_model.Node, limit,
('started_at', 'uuid'),
marker=marker, sort_dir='desc')
result = [db_model.Node(uuid=entry.uuid, version_id=entry.version_id,
state=entry.state, started_at=entry.started_at,
finished_at=entry.finished_at,
error=entry.error,
manage_boot=entry.manage_boot)
for entry in rows.all()]
return result
def store_introspection_data(node_id, introspection_data, processed=True):
@ -1013,18 +1006,12 @@ def store_introspection_data(node_id, introspection_data, processed=True):
:param processed: Specify the type of introspected data, set to False
indicates the data is unprocessed.
"""
with db.ensure_transaction() as session:
record = db.model_query(db.IntrospectionData,
session=session).filter_by(
uuid=node_id, processed=processed).first()
if record is None:
row = db.IntrospectionData()
row.update({'uuid': node_id, 'processed': processed,
'data': introspection_data})
session.add(row)
else:
record.update({'data': introspection_data})
session.flush()
# NOTE(TheJulia): For compatability, but at the same time there is
# two nodes of introspection data operation, DB and originally swift.
db.store_introspection_data(
node_id=node_id,
introspection_data=introspection_data,
processed=processed)
def get_introspection_data(node_id, processed=True):
@ -1035,12 +1022,6 @@ def get_introspection_data(node_id, processed=True):
indicates retrieving the unprocessed data.
:return: A dictionary representation of intropsected data
"""
try:
ref = db.model_query(db.IntrospectionData).filter_by(
uuid=node_id, processed=processed).one()
return ref['data']
except orm_errors.NoResultFound:
msg = _('Introspection data not found for node %(node)s, '
'processed=%(processed)s') % {'node': node_id,
'processed': processed}
raise utils.IntrospectionDataNotFound(msg)
# NOTE(TheJulia): Moved to db api, here for compatability.
return db.get_introspection_data(node_id=node_id,
processed=processed)

View File

@ -20,7 +20,7 @@ from oslo_config import cfg
from oslo_utils import excutils
from ironic_inspector.common import swift
from ironic_inspector import node_cache
from ironic_inspector.db import api as db
from ironic_inspector import utils
@ -99,7 +99,7 @@ class DatabaseStore(object):
def get(self, node_uuid, processed=True, get_json=False):
LOG.debug('Fetching introspection data from database for %(node)s',
{'node': node_uuid})
data = node_cache.get_introspection_data(node_uuid, processed)
data = db.get_introspection_data(node_uuid, processed)
if get_json:
return data
return json.dumps(data)
@ -107,8 +107,8 @@ class DatabaseStore(object):
def save(self, node_uuid, data, processed=True):
introspection_data = _filter_data_excluded_keys(data)
try:
node_cache.store_introspection_data(node_uuid,
introspection_data, processed)
db.store_introspection_data(node_uuid,
introspection_data, processed)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.exception('Failed to store introspection data in '

View File

@ -171,7 +171,6 @@ def store_introspection_data(node_uuid, data, processed=True):
def _store_unprocessed_data(node_uuid, data):
# runs in background
try:
store_introspection_data(node_uuid, data, processed=False)
except Exception:
@ -208,7 +207,6 @@ def process(introspection_data):
# Locking is already done in find_node() but may be not done in a
# node_not_found hook
node_info.acquire_lock()
if failures or node_info is None:
msg = _('The following failures happened during running '
'pre-processing hooks:\n%s') % '\n'.join(failures)
@ -216,7 +214,6 @@ def process(introspection_data):
node_info.finished(istate.Events.error, error='\n'.join(failures))
_store_logs(introspection_data, node_info)
raise utils.Error(msg, node_info=node_info, data=introspection_data)
LOG.info('Matching node is %s', node_info.uuid,
node_info=node_info, data=introspection_data)
@ -225,12 +222,9 @@ def process(introspection_data):
raise utils.Error(_('Node processing already finished with '
'error: %s') % node_info.error,
node_info=node_info, code=400)
# Note(mkovacik): store data now when we're sure that a background
# thread won't race with other process() or introspect.abort()
# call
utils.executor().submit(_store_unprocessed_data, node_info.uuid,
unprocessed_data)
# NOTE(TheJulia): this was previously called as a background
# process, but we can't do that with sqlite.
_store_unprocessed_data(node_info.uuid, unprocessed_data)
try:
node = node_info.node()

View File

@ -16,13 +16,11 @@
import jsonpath_rw as jsonpath
import jsonschema
from oslo_db import exception as db_exc
from oslo_utils import timeutils
from oslo_utils import uuidutils
from sqlalchemy import orm
from ironic_inspector.common.i18n import _
from ironic_inspector import db
from ironic_inspector.db import api as db
from ironic_inspector.db import model
from ironic_inspector.plugins import base as plugins_base
from ironic_inspector import utils
@ -387,28 +385,7 @@ def create(conditions_json, actions_json, uuid=None,
conditions = _validate_conditions(conditions_json)
actions = _validate_actions(actions_json)
try:
with db.ensure_transaction() as session:
rule = db.Rule(uuid=uuid, description=description, disabled=False,
created_at=timeutils.utcnow(), scope=scope)
for field, op, multiple, invert, params in conditions:
rule.conditions.append(db.RuleCondition(op=op,
field=field,
multiple=multiple,
invert=invert,
params=params))
for action, params in actions:
rule.actions.append(db.RuleAction(action=action,
params=params))
rule.save(session)
except db_exc.DBDuplicateEntry as exc:
LOG.error('Database integrity error %s when '
'creating a rule', exc)
raise utils.Error(_('Rule with UUID %s already exists') % uuid,
code=409)
rule = db.create_rule(uuid, conditions, actions, description, scope)
LOG.info('Created rule %(uuid)s with description "%(descr)s" '
'and scope "%(scope)s"',
@ -422,11 +399,7 @@ def create(conditions_json, actions_json, uuid=None,
def get(uuid):
"""Get a rule by its UUID."""
try:
rule = db.model_query(db.Rule).filter_by(uuid=uuid).one()
except orm.exc.NoResultFound:
raise utils.Error(_('Rule %s was not found') % uuid, code=404)
rule = db.get_rule(uuid=uuid)
return IntrospectionRule(uuid=rule.uuid, actions=rule.actions,
conditions=rule.conditions,
description=rule.description,
@ -435,35 +408,25 @@ def get(uuid):
def get_all():
"""List all rules."""
query = db.model_query(db.Rule).order_by(db.Rule.created_at)
return [IntrospectionRule(uuid=rule.uuid, actions=rule.actions,
conditions=rule.conditions,
description=rule.description,
scope=rule.scope)
for rule in query]
with db.session_for_read() as session:
query = session.query(model.Rule).order_by(model.Rule.created_at)
return [IntrospectionRule(uuid=rule.uuid, actions=rule.actions,
conditions=rule.conditions,
description=rule.description,
scope=rule.scope)
for rule in query.all()]
def delete(uuid):
"""Delete a rule by its UUID."""
with db.ensure_transaction() as session:
db.model_query(db.RuleAction,
session=session).filter_by(rule=uuid).delete()
db.model_query(db.RuleCondition,
session=session) .filter_by(rule=uuid).delete()
count = (db.model_query(db.Rule, session=session)
.filter_by(uuid=uuid).delete())
if not count:
raise utils.Error(_('Rule %s was not found') % uuid, code=404)
db.delete_rule(uuid)
LOG.info('Introspection rule %s was deleted', uuid)
def delete_all():
"""Delete all rules."""
with db.ensure_transaction() as session:
db.model_query(db.RuleAction, session=session).delete()
db.model_query(db.RuleCondition, session=session).delete()
db.model_query(db.Rule, session=session).delete()
db.delete_all_rules()
LOG.info('All introspection rules were deleted')

View File

@ -19,6 +19,7 @@ import fixtures
import futurist
from oslo_concurrency import lockutils
from oslo_config import fixture as config_fixture
from oslo_db.sqlalchemy import enginefacade
from oslo_log import log
from oslo_utils import units
from oslo_utils import uuidutils
@ -27,7 +28,9 @@ from oslotest import base as test_base
from ironic_inspector.common import i18n
import ironic_inspector.conf
from ironic_inspector.conf import opts as conf_opts
from ironic_inspector import db
from ironic_inspector.db import api as db
from ironic_inspector.db import migration
from ironic_inspector.db import model as db_model
from ironic_inspector import introspection_state as istate
from ironic_inspector import node_cache
from ironic_inspector.plugins import base as plugins_base
@ -36,20 +39,61 @@ from ironic_inspector import utils
CONF = ironic_inspector.conf.CONF
_DB_CACHE = None
class Database(fixtures.Fixture):
def __init__(self, engine, db_migrate, sql_connection, functional):
self.sql_connection = sql_connection
self.engine = engine
self.db_migrate = db_migrate
self.functional = functional
self.engine.dispose()
conn = self.engine.connect()
if not functional:
self.setup_sqlite(db_migrate, engine, conn)
self._DB = "".join(line for line in conn.connection.iterdump())
conn.close()
self.engine.dispose()
def setup_sqlite(self, db_migrate, engine, conn):
if db_migrate.version(engine=engine):
return
db_model.Base.metadata.create_all(conn)
db_migrate.stamp('head')
def setUp(self):
super(Database, self).setUp()
conn = self.engine.connect()
if not self.db_migrate.version():
conn.connection.executescript(self._DB)
self.addCleanup(self.engine.dispose)
class BaseTest(test_base.BaseTestCase):
IS_FUNCTIONAL = False
def setUp(self):
CONF.set_override('connection_trace', True, group='database')
CONF.set_override('connection_debug', 100, group='database')
super(BaseTest, self).setUp()
if not self.IS_FUNCTIONAL:
self.init_test_conf()
self.session = db.get_writer_session()
engine = self.session.get_bind()
db.Base.metadata.create_all(engine)
engine.connect()
self.addCleanup(engine.dispose)
global _DB_CACHE
if not _DB_CACHE:
engine = enginefacade.writer.get_engine()
_DB_CACHE = Database(engine, migration,
sql_connection=CONF.database.connection,
functional=self.IS_FUNCTIONAL)
self.useFixture(_DB_CACHE)
plugins_base.reset()
node_cache._SEMAPHORES = lockutils.Semaphores()
patch = mock.patch.object(i18n, '_', lambda s: s)
@ -167,9 +211,9 @@ class InventoryTest(BaseTest):
}
class NodeTest(InventoryTest):
class NodeTestBase(InventoryTest):
def setUp(self):
super(NodeTest, self).setUp()
super(NodeTestBase, self).setUp()
self.uuid = uuidutils.generate_uuid()
fake_node = {
@ -203,15 +247,32 @@ class NodeTest(InventoryTest):
fixtures.MockPatchObject(time, 'sleep', autospec=True))
class NodeStateTest(NodeTest):
class NodeTest(NodeTestBase):
def setUp(self):
super(NodeTest, self).setUp()
with db.session_for_write() as session:
self.db_node = db_model.Node(
uuid=self.node_info.uuid,
version_id=self.node_info._version_id,
state=self.node_info._state,
started_at=self.node_info.started_at,
finished_at=self.node_info.finished_at,
error=self.node_info.error)
session.add(self.db_node)
class NodeStateTest(NodeTestBase):
def setUp(self):
super(NodeStateTest, self).setUp()
self.node_info._version_id = uuidutils.generate_uuid()
self.node_info._state = istate.States.starting
self.db_node = db.Node(uuid=self.node_info.uuid,
version_id=self.node_info._version_id,
state=self.node_info._state,
started_at=self.node_info.started_at,
finished_at=self.node_info.finished_at,
error=self.node_info.error)
self.db_node.save(self.session)
with db.session_for_write() as session:
self.db_node = db_model.Node(
uuid=self.node_info.uuid,
version_id=self.node_info._version_id,
state=self.node_info._state,
started_at=self.node_info.started_at,
finished_at=self.node_info.finished_at,
error=self.node_info.error)
session.add(self.db_node)
session.commit()

View File

@ -36,14 +36,14 @@ import requests
from ironic_inspector.cmd import all as inspector_cmd
from ironic_inspector.cmd import dbsync
from ironic_inspector.common import ironic as ir_utils
from ironic_inspector import db
from ironic_inspector.db import api as db
from ironic_inspector import introspection_state as istate
from ironic_inspector import main
from ironic_inspector import node_cache
from ironic_inspector import rules
from ironic_inspector.test import base
from ironic_inspector.test.unit import test_rules
eventlet.monkey_patch()
CONF = """
@ -105,6 +105,7 @@ def _query_string(*field_names):
class Base(base.NodeTest):
ROOT_URL = 'http://127.0.0.1:5050'
IS_FUNCTIONAL = True
def setUp(self):
@ -146,7 +147,7 @@ class Base(base.NodeTest):
def tearDown(self):
super(Base, self).tearDown()
node_cache._delete_node(self.uuid)
db.delete_node(self.uuid)
def call(self, method, endpoint, data=None, expect_error=None,
api_version=None):
@ -243,17 +244,18 @@ class Base(base.NodeTest):
def db_row(self):
"""return database row matching self.uuid."""
return db.model_query(db.Node).get(self.uuid)
return db.get_node(self.uuid)
class Test(Base):
def test_bmc(self):
self.call_introspect(self.uuid)
eventlet.greenthread.sleep(DEFAULT_SLEEP)
self.cli.set_node_power_state.assert_called_once_with(self.uuid,
'rebooting')
status = self.call_get_status(self.uuid)
self.check_status(status, finished=False, state=istate.States.waiting)
res = self.call_continue(self.data)
@ -381,11 +383,9 @@ class Test(Base):
status = self.call_get_status(self.uuid)
self.check_status(status, finished=True, state=istate.States.finished)
# fetch all statuses and db nodes to assert pagination
statuses = self.call_get_statuses().get('introspection')
nodes = db.model_query(db.Node).order_by(
db.Node.started_at.desc()).all()
nodes = db.get_nodes()
# assert ordering
self.assertEqual([node.uuid for node in nodes],
@ -694,10 +694,8 @@ class Test(Base):
# waiting -> processing is a strict state transition
self.call_introspect(self.uuid)
eventlet.greenthread.sleep(DEFAULT_SLEEP)
row = self.db_row()
row.state = istate.States.processing
with db.ensure_transaction() as session:
row.save(session)
db.update_node(self.uuid, state=istate.States.processing)
self.call_continue(self.data, expect_error=400)
status = self.call_get_status(self.uuid)
self.check_status(status, finished=True, state=istate.States.error,
@ -830,7 +828,6 @@ def mocked_server():
except requests.ConnectionError:
if i == 9:
raise
print('Service did not start yet')
eventlet.greenthread.sleep(3)
else:
break

View File

@ -0,0 +1,146 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from oslo_utils import uuidutils
from ironic_inspector.db import api as db
from ironic_inspector import introspection_state as istate
from ironic_inspector.test import base as test_base
from ironic_inspector import utils
class TestDBAPI(test_base.NodeStateTest):
def setUp(self):
super(TestDBAPI, self).setUp()
self.node2 = db.create_node(uuid=uuidutils.generate_uuid(),
state=istate.States.processing,
started_at=datetime.datetime(1, 1, 2),
finished_at=None)
self.attribute2 = ('fake_attr', 'boo')
db.set_attribute(self.node2.uuid, *self.attribute2)
self.option2 = ('fake_opt', 'foo')
db.set_option(self.node2.uuid, *self.option2)
def test_get_nodes(self):
nodes = db.get_nodes()
self.assertItemsEqual([self.node2.uuid, self.uuid],
[node.uuid for node in nodes])
def test_get_node_by_uuid(self):
node = db.get_node(self.uuid)
self.assertEqual(self.uuid, node.uuid)
def test_get_node_by_uuid_not_found(self):
self.assertRaises(
utils.NodeNotFoundInDBError,
db.get_node,
uuidutils.generate_uuid())
def test_get_node_by_uuid_version_mismatch(self):
self.assertRaises(
utils.NodeNotFoundInDBError,
db.get_node,
self.node2.uuid, version_id=123)
def test_get_active_nodes(self):
nodes = db.get_active_nodes()
self.assertItemsEqual([self.node2.uuid, self.uuid],
[node.uuid for node in nodes])
def test_get_active_nodes_before(self):
nodes = db.get_active_nodes(started_before=datetime.datetime(1, 1, 2))
self.assertItemsEqual([self.uuid],
[node.uuid for node in nodes])
def test_list_nodes_by_attributes(self):
attrs = db.list_nodes_by_attributes([self.attribute2])
self.assertItemsEqual([self.node2.uuid],
[attr.node_uuid for attr in attrs])
def test_list_nodes_options_by_uuid(self):
opts = db.list_nodes_options_by_uuid(self.node2.uuid)
self.assertItemsEqual([self.option2],
[(opt.name, opt.value) for opt in opts])
def test_update_node(self):
db.update_node(self.node2.uuid, state=istate.States.finished)
node2 = db.get_node(self.node2.uuid)
self.assertNotEqual(self.node2.state, node2.state)
self.assertEqual(istate.States.finished, node2.state)
def test_update_node_raises_exception(self):
self.assertRaises(utils.NodeNotFoundInDBError,
db.update_node,
uuidutils.generate_uuid(),
error='foo')
def tst_add_node(self):
db.add_node(
uuid=uuidutils.generate_uuid(),
state=istate.States.finished)
self.assertEqual(2, len(db.get_nodes()))
def test_delete_node(self):
db.delete_node(self.node2.uuid)
self.assertRaises(utils.NodeNotFoundInDBError,
db.get_node,
self.node2.uuid)
self.assertEqual([], db.get_attributes(node_uuid=self.node2.uuid))
self.assertEqual([], db.get_options(uuid=self.node2.uuid))
def test_delete_nodes(self):
db.delete_nodes()
self.assertEqual([], db.get_nodes())
def test_delete_nodes_finished(self):
db.delete_nodes(finished_until=datetime.datetime(4, 4, 4))
self.assertItemsEqual([self.uuid, self.node2.uuid],
[node.uuid for node in db.get_nodes()])
def test_delete_options(self):
db.delete_options(uuid=self.node2.uuid)
self.assertEqual([], db.get_options(uuid=self.node2.uuid))
def test_delete_attributes(self):
node3 = db.create_node(uuid=uuidutils.generate_uuid(),
state=istate.States.finished,
started_at=datetime.datetime(1, 1, 3),
finished_at=datetime.datetime(1, 1, 4))
attribute3 = ('fake_attr', 'boo')
db.set_attribute(node3.uuid, *attribute3)
db.delete_attributes(node3.uuid)
self.assertEqual(
[], db.get_attributes(node_uuid=node3.uuid))
def test_store_introspection_data(self):
node = db.create_node(uuid=uuidutils.generate_uuid(),
state=istate.States.finished,
started_at=datetime.datetime(1, 1, 3),
finished_at=datetime.datetime(1, 1, 4))
db.store_introspection_data(node.uuid, {'foo': 'bar'})
res = db.get_introspection_data(node.uuid)
self.assertEqual(res['foo'], 'bar')

View File

@ -42,7 +42,8 @@ from oslotest import base as test_base
import sqlalchemy
from ironic_inspector.cmd import dbsync
from ironic_inspector import db
from ironic_inspector.db import api as db
from ironic_inspector.db import model as db_model
from ironic_inspector import introspection_state as istate
from ironic_inspector.test import base
@ -53,12 +54,10 @@ LOG = logging.getLogger(__name__)
@contextlib.contextmanager
def patch_with_engine(engine):
with mock.patch.object(db, 'get_writer_session',
autospec=True) as patch_w_sess, \
mock.patch.object(db, 'get_reader_session',
autospec=True) as patch_r_sess:
autospec=True) as patch_w_sess:
# FIXME(stephenfin): we need to remove reliance on autocommit semantics
# ASAP since it's not compatible with SQLAlchemy 2.0
patch_w_sess.return_value = patch_r_sess.return_value = (
patch_w_sess.return_value = (
orm.get_maker(engine, autocommit=True)())
yield
@ -479,7 +478,7 @@ class ModelsMigrationSyncMixin(object):
self.engine = enginefacade.writer.get_engine()
def get_metadata(self):
return db.Base.metadata
return db_model.Base.metadata
def get_engine(self):
return self.engine

View File

@ -1,80 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from ironic_inspector import db
from ironic_inspector.test import base as test_base
class TestDB(test_base.NodeTest):
@mock.patch.object(db, 'get_reader_session', autospec=True)
def test_model_query(self, mock_reader):
mock_session = mock_reader.return_value
fake_query = mock_session.query.return_value
query = db.model_query('db.Node')
mock_reader.assert_called_once_with()
mock_session.query.assert_called_once_with('db.Node')
self.assertEqual(fake_query, query)
@mock.patch.object(db, 'get_writer_session', autospec=True)
def test_ensure_transaction_new_session(self, mock_writer):
mock_session = mock_writer.return_value
with db.ensure_transaction() as session:
mock_writer.assert_called_once_with()
mock_session.begin.assert_called_once_with(subtransactions=True)
self.assertEqual(mock_session, session)
@mock.patch.object(db, 'get_writer_session', autospec=True)
def test_ensure_transaction_session(self, mock_writer):
mock_session = mock.MagicMock()
with db.ensure_transaction(session=mock_session) as session:
self.assertFalse(mock_writer.called)
mock_session.begin.assert_called_once_with(subtransactions=True)
self.assertEqual(mock_session, session)
@mock.patch.object(db.enginefacade, 'transaction_context', autospec=True)
def test__create_context_manager(self, mock_cnxt):
mock_ctx_mgr = mock_cnxt.return_value
ctx_mgr = db._create_context_manager()
mock_ctx_mgr.configure.assert_called_once_with(
sqlite_fk=False,
__autocommit=True,
)
self.assertEqual(mock_ctx_mgr, ctx_mgr)
@mock.patch.object(db, 'get_context_manager', autospec=True)
def test_get_reader_session(self, mock_cnxt_mgr):
mock_cnxt = mock_cnxt_mgr.return_value
mock_sess_maker = mock_cnxt.reader.get_sessionmaker.return_value
session = db.get_reader_session()
mock_sess_maker.assert_called_once_with()
self.assertEqual(mock_sess_maker.return_value, session)
@mock.patch.object(db, 'get_context_manager', autospec=True)
def test_get_writer_session(self, mock_cnxt_mgr):
mock_cnxt = mock_cnxt_mgr.return_value
mock_sess_maker = mock_cnxt.writer.get_sessionmaker.return_value
session = db.get_writer_session()
mock_sess_maker.assert_called_once_with()
self.assertEqual(mock_sess_maker.return_value, session)

View File

@ -30,7 +30,7 @@ from ironic_inspector import utils
CONF = cfg.CONF
class BaseTest(test_base.NodeTest):
class BaseTest(test_base.NodeTestBase):
def setUp(self):
super(BaseTest, self).setUp()
introspect._LAST_INTROSPECTION_TIME = 0
@ -436,7 +436,6 @@ class TestIntrospect(BaseTest):
introspect.introspect(self.uuid)
self.sleep_fixture.mock.assert_called_once_with(8)
cli.set_node_boot_device.assert_called_once_with(self.uuid,
'pxe',
persistent=False)

View File

@ -48,8 +48,6 @@ class BaseManagerTest(test_base.NodeTest):
class TestManagerInitHost(BaseManagerTest):
def setUp(self):
super(TestManagerInitHost, self).setUp()
self.mock_db_init = self.useFixture(fixtures.MockPatchObject(
manager.db, 'init')).mock
self.mock_validate_processing_hooks = self.useFixture(
fixtures.MockPatchObject(manager.plugins_base,
'validate_processing_hooks')).mock
@ -95,7 +93,6 @@ class TestManagerInitHost(BaseManagerTest):
mock_coordinator = mock.MagicMock()
mock_get_coord.return_value = mock_coordinator
self.manager.init_host()
self.mock_db_init.assert_called_once_with()
self.mock_validate_processing_hooks.assert_called_once_with()
self.mock_filter.init_filter.assert_called_once_with()
self.assert_periodics()
@ -113,7 +110,6 @@ class TestManagerInitHost(BaseManagerTest):
self.mock_exit.side_effect = SystemExit('Stop!')
self.assertRaisesRegex(SystemExit, 'Stop!', self.manager.init_host)
self.mock_db_init.assert_called_once_with()
self.mock_log.critical.assert_called_once_with(str(error))
self.mock_exit.assert_called_once_with(1)
self.mock_filter.init_filter.assert_not_called()
@ -126,7 +122,6 @@ class TestManagerInitHost(BaseManagerTest):
mock_coordinator = mock.MagicMock()
mock_get_coord.return_value = mock_coordinator
self.manager.init_host()
self.mock_db_init.assert_called_once_with()
self.mock_validate_processing_hooks.assert_called_once_with()
self.mock_filter.init_filter.assert_called_once_with()
self.assert_periodics()
@ -140,7 +135,6 @@ class TestManagerInitHost(BaseManagerTest):
mock_coordinator = mock.MagicMock()
mock_get_coord.return_value = mock_coordinator
self.manager.init_host()
self.mock_db_init.assert_called_once_with()
self.mock_validate_processing_hooks.assert_called_once_with()
self.mock_filter.init_filter.assert_called_once_with()
self.assert_periodics()
@ -157,7 +151,6 @@ class TestManagerInitHost(BaseManagerTest):
'backend failed.'),
None)
self.assertRaises(tooz.ToozError, self.manager.init_host)
self.mock_db_init.assert_called_once_with()
self.mock_validate_processing_hooks.assert_not_called()
self.mock_filter.init_filter.assert_not_called()
self.assertIsNone(self.manager._periodics_worker)

View File

@ -14,7 +14,6 @@
import copy
import datetime
import functools
import json
import unittest
from unittest import mock
@ -26,7 +25,8 @@ from oslo_utils import uuidutils
from ironic_inspector.common import ironic as ir_utils
from ironic_inspector.common import locking
from ironic_inspector import db
from ironic_inspector.db import api as db
from ironic_inspector.db import model as db_model
from ironic_inspector import introspection_state as istate
from ironic_inspector import node_cache
from ironic_inspector.test import base as test_base
@ -35,19 +35,16 @@ from ironic_inspector import utils
CONF = cfg.CONF
class TestNodeCache(test_base.NodeTest):
class TestNodeCache(test_base.NodeTestBase):
def test_add_node(self):
# Ensure previous node information is cleared
uuid2 = uuidutils.generate_uuid()
session = db.get_writer_session()
with session.begin():
db.Node(uuid=self.node.uuid,
state=istate.States.starting).save(session)
db.Node(uuid=uuid2,
state=istate.States.starting).save(session)
db.Attribute(uuid=uuidutils.generate_uuid(), name='mac',
value='11:22:11:22:11:22',
node_uuid=self.uuid).save(session)
db.create_node(uuid=self.node.uuid,
state=istate.States.starting)
db.create_node(uuid=uuid2,
state=istate.States.starting)
db.set_attribute(node_uuid=self.uuid, name='mac',
values=['11:22:11:22:11:22'])
node = node_cache.add_node(self.node.uuid,
istate.States.starting,
@ -60,145 +57,102 @@ class TestNodeCache(test_base.NodeTest):
datetime.datetime.utcnow() + datetime.timedelta(seconds=60)))
self.assertFalse(node._lock.is_locked())
res = set(db.model_query(db.Node.uuid,
db.Node.started_at).all())
res = set((r.uuid, r.started_at) for r in db.get_nodes())
expected = {(node.uuid, node.started_at), (uuid2, None)}
self.assertEqual(expected, res)
res = db.model_query(db.Node).get(self.uuid)
res = db.get_node(self.uuid)
self.assertIsNotNone(res.version_id)
res = (db.model_query(db.Attribute.name,
db.Attribute.value, db.Attribute.node_uuid).
order_by(db.Attribute.name, db.Attribute.value).all())
res = db.get_attributes(order_by=('name', 'value'))
self.assertEqual([('bmc_address', '1.2.3.4', self.uuid),
('mac', self.macs[0], self.uuid),
('mac', self.macs[1], self.uuid),
('mac', self.macs[2], self.uuid)],
[(row.name, row.value, row.node_uuid) for row in res])
def test__delete_node(self):
session = db.get_writer_session()
with session.begin():
db.Node(uuid=self.node.uuid,
state=istate.States.finished).save(session)
db.Attribute(uuid=uuidutils.generate_uuid(), name='mac',
value='11:22:11:22:11:22', node_uuid=self.uuid).save(
session)
data = {'s': 'value', 'b': True, 'i': 42}
encoded = json.dumps(data)
db.Option(uuid=self.uuid, name='name', value=encoded).save(
session)
node_cache._delete_node(self.uuid)
session = db.get_writer_session()
row_node = db.model_query(db.Node).filter_by(
uuid=self.uuid).first()
self.assertIsNone(row_node)
row_attribute = db.model_query(db.Attribute).filter_by(
node_uuid=self.uuid).first()
self.assertIsNone(row_attribute)
row_option = db.model_query(db.Option).filter_by(
uuid=self.uuid).first()
self.assertIsNone(row_option)
@mock.patch.object(locking, 'get_lock', autospec=True)
@mock.patch.object(node_cache, '_list_node_uuids', autospec=True)
@mock.patch.object(node_cache, '_delete_node', autospec=True)
@mock.patch.object(db, 'delete_node', autospec=True)
def test_delete_nodes_not_in_list(self, mock__delete_node,
mock__list_node_uuids,
mock_get_lock):
uuid2 = uuidutils.generate_uuid()
uuids = {self.uuid}
mock__list_node_uuids.return_value = {self.uuid, uuid2}
session = db.get_writer_session()
with session.begin():
node_cache.delete_nodes_not_in_list(uuids)
node_cache.delete_nodes_not_in_list(uuids)
mock__delete_node.assert_called_once_with(uuid2)
mock_get_lock.assert_called_once_with(uuid2)
mock_get_lock.return_value.__enter__.assert_called_once_with()
def test_active_macs(self):
session = db.get_writer_session()
uuid2 = uuidutils.generate_uuid()
with session.begin():
db.Node(uuid=self.node.uuid,
state=istate.States.starting).save(session)
db.Node(uuid=uuid2,
state=istate.States.starting,
manage_boot=False).save(session)
values = [('mac', '11:22:11:22:11:22', self.uuid),
('mac', '22:11:22:11:22:11', self.uuid),
('mac', 'aa:bb:cc:dd:ee:ff', uuid2)]
for value in values:
db.Attribute(uuid=uuidutils.generate_uuid(), name=value[0],
value=value[1], node_uuid=value[2]).save(session)
db.create_node(uuid=self.node.uuid,
state=istate.States.starting)
db.create_node(uuid=uuid2,
state=istate.States.starting)
values = [('mac', '11:22:11:22:11:22', self.uuid),
('mac', '22:11:22:11:22:11', self.uuid),
('mac', 'aa:bb:cc:dd:ee:ff', uuid2)]
for value in values:
db.set_attribute(node_uuid=value[2], name=value[0],
values=[value[1]])
self.assertEqual({'11:22:11:22:11:22', '22:11:22:11:22:11',
# We still need to serve DHCP to unmanaged nodes
'aa:bb:cc:dd:ee:ff'},
node_cache.active_macs())
def test__list_node_uuids(self):
session = db.get_writer_session()
uuid2 = uuidutils.generate_uuid()
with session.begin():
db.Node(uuid=self.node.uuid,
state=istate.States.starting).save(session)
db.Node(uuid=uuid2,
state=istate.States.starting).save(session)
db.create_node(uuid=self.node.uuid,
state=istate.States.starting)
db.create_node(uuid=uuid2,
state=istate.States.starting)
node_uuid_list = node_cache._list_node_uuids()
self.assertEqual({self.uuid, uuid2}, node_uuid_list)
def test_add_attribute(self):
session = db.get_writer_session()
with session.begin():
db.Node(uuid=self.node.uuid,
state=istate.States.starting).save(session)
db.create_node(uuid=self.node.uuid,
state=istate.States.starting)
node_info = node_cache.NodeInfo(uuid=self.uuid, started_at=42)
node_info.add_attribute('key', 'value')
res = db.model_query(db.Attribute.name,
db.Attribute.value,
db.Attribute.node_uuid,
session=session)
res = res.order_by(db.Attribute.name, db.Attribute.value).all()
self.assertEqual([('key', 'value', self.uuid)],
[tuple(row) for row in res])
res = db.get_attributes()
self.assertEqual(
[('key', 'value', self.uuid)],
[(row.name, row.value, row.node_uuid) for row in res])
# check that .attributes got invalidated and reloaded
self.assertEqual({'key': ['value']}, node_info.attributes)
def test_add_attribute_same_name(self):
session = db.get_writer_session()
with session.begin():
db.Node(uuid=self.node.uuid,
state=istate.States.starting).save(session)
db.create_node(uuid=self.node.uuid,
state=istate.States.starting)
node_info = node_cache.NodeInfo(uuid=self.uuid, started_at=42)
node_info.add_attribute('key', ['foo', 'bar'])
node_info.add_attribute('key', 'baz')
res = db.model_query(db.Attribute.name, db.Attribute.value,
db.Attribute.node_uuid, session=session)
res = res.order_by(db.Attribute.name, db.Attribute.value).all()
self.assertEqual([('key', 'bar', self.uuid),
('key', 'baz', self.uuid),
('key', 'foo', self.uuid)],
[tuple(row) for row in res])
res = db.get_attributes()
self.assertEqual([
('key', 'foo', self.uuid),
('key', 'bar', self.uuid),
('key', 'baz', self.uuid)],
[(row.name, row.value, row.node_uuid) for row in res])
def test_add_attribute_same_value(self):
session = db.get_writer_session()
with session.begin():
db.Node(uuid=self.node.uuid,
state=istate.States.starting).save(session)
db.create_node(uuid=self.node.uuid,
state=istate.States.starting)
node_info = node_cache.NodeInfo(uuid=self.uuid, started_at=42)
node_info.add_attribute('key', 'value')
node_info.add_attribute('key', 'value')
res = db.model_query(db.Attribute.name, db.Attribute.value,
db.Attribute.node_uuid, session=session)
self.assertEqual([('key', 'value', self.uuid),
('key', 'value', self.uuid)],
[tuple(row) for row in res])
res = db.get_attributes()
self.assertEqual([
('key', 'value', self.uuid),
('key', 'value', self.uuid)],
[(row.name, row.value, row.node_uuid) for row in res])
def test_attributes(self):
node_info = node_cache.add_node(self.uuid,
@ -209,10 +163,8 @@ class TestNodeCache(test_base.NodeTest):
'mac': self.macs},
node_info.attributes)
# check invalidation
session = db.get_writer_session()
with session.begin():
db.Attribute(uuid=uuidutils.generate_uuid(), name='foo',
value='bar', node_uuid=self.uuid).save(session)
db.set_attribute(self.uuid, name='foo', values=['bar'])
# still cached
self.assertEqual({'bmc_address': ['1.2.3.4'],
'mac': self.macs},
@ -223,7 +175,7 @@ class TestNodeCache(test_base.NodeTest):
node_info.attributes)
class TestNodeCacheFind(test_base.NodeTest):
class TestNodeCacheFind(test_base.NodeTestBase):
def setUp(self):
super(TestNodeCacheFind, self).setUp()
self.macs2 = ['00:00:00:00:00:00']
@ -301,18 +253,13 @@ class TestNodeCacheFind(test_base.NodeTest):
self.assertTrue(res._lock.is_locked())
def test_inconsistency(self):
session = db.get_writer_session()
with session.begin():
(db.model_query(db.Node).filter_by(uuid=self.uuid).
delete())
db.delete_node(uuid=self.uuid)
self.assertRaises(utils.Error, node_cache.find_node,
bmc_address='1.2.3.4')
def test_already_finished(self):
session = db.get_writer_session()
with session.begin():
(db.model_query(db.Node).filter_by(uuid=self.uuid).
update({'finished_at': datetime.datetime.utcnow()}))
db.update_node(self.uuid,
finished_at=datetime.datetime.utcnow())
self.assertRaises(utils.Error, node_cache.find_node,
bmc_address='1.2.3.4')
@ -322,36 +269,30 @@ class TestNodeCacheFind(test_base.NodeTest):
bmc_address="' OR ''='")
class TestNodeCacheCleanUp(test_base.NodeTest):
class TestNodeCacheCleanUp(test_base.NodeTestBase):
def setUp(self):
super(TestNodeCacheCleanUp, self).setUp()
self.started_at = datetime.datetime.utcnow()
session = db.get_writer_session()
with session.begin():
db.Node(uuid=self.uuid,
state=istate.States.waiting,
started_at=self.started_at).save(
session)
for v in self.macs:
db.Attribute(uuid=uuidutils.generate_uuid(), name='mac',
value=v, node_uuid=self.uuid).save(session)
db.Option(uuid=self.uuid, name='foo', value='bar').save(
session)
db.IntrospectionData(uuid=self.uuid, processed=False,
data={'fake': 'data'}).save(session)
db.create_node(uuid=self.uuid,
state=istate.States.waiting,
started_at=self.started_at)
db.set_attribute(self.uuid, name='mac', values=self.macs)
db.set_option(self.uuid, name='foo', value='bar')
db.store_introspection_data(
self.uuid, {'fake': 'data'}, processed=False)
def test_no_timeout(self):
CONF.set_override('timeout', 0)
self.assertFalse(node_cache.clean_up())
res = [tuple(row) for row in
db.model_query(db.Node.finished_at,
db.Node.error).all()]
res = [(row.finished_at, row.error) for row in
db.get_nodes()]
self.assertEqual([(None, None)], res)
self.assertEqual(len(self.macs),
db.model_query(db.Attribute).count())
self.assertEqual(1, db.model_query(db.Option).count())
self.assertCountEqual(self.macs,
[r.value for r in db.get_attributes()])
self.assertEqual(1, len(db.get_options()))
@mock.patch.object(locking, 'get_lock', autospec=True)
@mock.patch.object(timeutils, 'utcnow', autospec=True)
@ -360,13 +301,13 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
self.assertFalse(node_cache.clean_up())
res = [tuple(row) for row in db.model_query(
db.Node.finished_at, db.Node.error).all()]
res = [(row.finished_at, row.error) for row in
db.get_nodes()]
self.assertEqual([(None, None)], res)
self.assertEqual(len(self.macs),
db.model_query(db.Attribute).count())
self.assertEqual(1, db.model_query(db.Option).count())
self.assertEqual(1, db.model_query(db.IntrospectionData).count())
self.assertEqual(1, db.model_query(db_model.IntrospectionData).count())
self.assertCountEqual(self.macs,
[r.value for r in db.get_attributes()])
self.assertEqual(1, len(db.get_options()))
self.assertFalse(get_lock_mock.called)
@mock.patch.object(node_cache.NodeInfo, 'acquire_lock', autospec=True)
@ -374,20 +315,17 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
def test_timeout(self, time_mock, lock_mock):
# Add a finished node to confirm we don't try to timeout it
time_mock.return_value = self.started_at
session = db.get_writer_session()
finished_at = self.started_at + datetime.timedelta(seconds=60)
with session.begin():
db.Node(uuid=self.uuid + '1', started_at=self.started_at,
state=istate.States.waiting,
finished_at=finished_at).save(session)
db.create_node(uuid=uuidutils.generate_uuid(),
started_at=self.started_at,
state=istate.States.waiting,
finished_at=finished_at)
CONF.set_override('timeout', 99)
time_mock.return_value = (self.started_at +
datetime.timedelta(seconds=100))
self.assertEqual([self.uuid], node_cache.clean_up())
res = [(row.state, row.finished_at, row.error) for row in
db.model_query(db.Node).all()]
db.get_nodes()]
self.assertEqual(
[(istate.States.error,
self.started_at + datetime.timedelta(seconds=100),
@ -395,20 +333,19 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
(istate.States.waiting,
self.started_at + datetime.timedelta(seconds=60), None)],
res)
self.assertEqual([], db.model_query(db.Attribute).all())
self.assertEqual([], db.model_query(db.Option).all())
self.assertEqual([], db.get_attributes())
self.assertEqual([], db.get_options())
lock_mock.assert_called_once_with(mock.ANY, blocking=False)
@mock.patch.object(locking, 'get_lock', autospec=True)
@mock.patch.object(timeutils, 'utcnow', autospec=True)
def test_timeout_active_state(self, time_mock, lock_mock):
time_mock.return_value = self.started_at
session = db.get_writer_session()
db.update_node(self.uuid, state=istate.States.starting)
CONF.set_override('timeout', 1)
for state in [istate.States.starting, istate.States.enrolling,
istate.States.processing, istate.States.reapplying]:
db.model_query(db.Node, session=session).filter_by(
uuid=self.uuid).update({'state': state, 'finished_at': None})
db.update_node(self.uuid, state=state, finished_at=None)
current_time = self.started_at + datetime.timedelta(seconds=2)
time_mock.return_value = current_time
@ -416,7 +353,7 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
self.assertEqual([self.uuid], node_cache.clean_up())
res = [(row.state, row.finished_at, row.error) for row in
db.model_query(db.Node).all()]
db.get_nodes()]
self.assertEqual(
[(istate.States.error, current_time, 'Introspection timeout')],
res)
@ -433,20 +370,18 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
self.assertEqual([], node_cache.clean_up())
res = [(row.state, row.finished_at, row.error) for row in
db.model_query(db.Node).all()]
db.model_query(db_model.Node).all()]
self.assertEqual([('waiting', None, None)], res)
get_lock_mock.assert_called_once_with(mock.ANY, blocking=False)
class TestNodeCacheGetNode(test_base.NodeTest):
class TestNodeCacheGetNode(test_base.NodeTestBase):
def test_ok(self):
started_at = (datetime.datetime.utcnow() -
datetime.timedelta(seconds=42))
session = db.get_writer_session()
with session.begin():
db.Node(uuid=self.uuid,
state=istate.States.starting,
started_at=started_at).save(session)
db.create_node(uuid=self.uuid,
state=istate.States.starting,
started_at=started_at)
info = node_cache.get_node(self.uuid)
self.assertEqual(self.uuid, info.uuid)
@ -462,11 +397,10 @@ class TestNodeCacheGetNode(test_base.NodeTest):
def test_with_name(self):
started_at = (datetime.datetime.utcnow() -
datetime.timedelta(seconds=42))
session = db.get_writer_session()
with session.begin():
db.Node(uuid=self.uuid,
state=istate.States.starting,
started_at=started_at).save(session)
db.create_node(
uuid=self.uuid,
state=istate.States.starting,
started_at=started_at)
ironic = mock.Mock()
ironic.get_node.return_value = self.node
@ -481,7 +415,7 @@ class TestNodeCacheGetNode(test_base.NodeTest):
@mock.patch.object(timeutils, 'utcnow', lambda: datetime.datetime(1, 1, 1))
class TestNodeInfoFinished(test_base.NodeTest):
class TestNodeInfoFinished(test_base.NodeTestBase):
def setUp(self):
super(TestNodeInfoFinished, self).setUp()
node_cache.add_node(self.uuid,
@ -490,33 +424,26 @@ class TestNodeInfoFinished(test_base.NodeTest):
mac=self.macs)
self.node_info = node_cache.NodeInfo(
uuid=self.uuid, started_at=datetime.datetime(3, 1, 4))
session = db.get_writer_session()
with session.begin():
db.Option(uuid=self.uuid, name='foo', value='bar').save(
session)
db.set_option(self.uuid, name='foo', value='bar')
def test_success(self):
self.node_info.finished(istate.Events.finish)
session = db.get_writer_session()
with session.begin():
self.assertEqual((datetime.datetime(1, 1, 1), None),
tuple(db.model_query(
db.Node.finished_at,
db.Node.error).first()))
self.assertEqual([], db.model_query(db.Attribute,
session=session).all())
self.assertEqual([], db.model_query(db.Option,
session=session).all())
node = db.get_nodes()[0]
self.assertEqual((datetime.datetime(1, 1, 1), None),
(node.finished_at, node.error))
self.assertEqual([], db.get_attributes())
self.assertEqual([], db.get_options())
def test_error(self):
self.node_info.finished(istate.Events.error, error='boom')
node = db.get_nodes()[0]
self.assertEqual((datetime.datetime(1, 1, 1), 'boom'),
tuple(db.model_query(db.Node.finished_at,
db.Node.error).first()))
self.assertEqual([], db.model_query(db.Attribute).all())
self.assertEqual([], db.model_query(db.Option).all())
(node.finished_at, node.error))
self.assertEqual([], db.get_attributes())
self.assertEqual([], db.get_options())
def test_release_lock(self):
self.node_info.acquire_lock()
@ -524,7 +451,7 @@ class TestNodeInfoFinished(test_base.NodeTest):
self.assertFalse(self.node_info._lock.is_locked())
class TestNodeInfoOptions(test_base.NodeTest):
class TestNodeInfoOptions(test_base.NodeTestBase):
def setUp(self):
super(TestNodeInfoOptions, self).setUp()
node_cache.add_node(self.uuid,
@ -532,10 +459,7 @@ class TestNodeInfoOptions(test_base.NodeTest):
bmc_address='1.2.3.4',
mac=self.macs)
self.node_info = node_cache.NodeInfo(uuid=self.uuid, started_at=3.14)
session = db.get_writer_session()
with session.begin():
db.Option(uuid=self.uuid, name='foo', value='"bar"').save(
session)
db.set_option(self.uuid, name='foo', value='"bar"')
def test_get(self):
self.assertEqual({'foo': 'bar'}, self.node_info.options)
@ -629,7 +553,7 @@ class TestNodeCacheIronicObjects(unittest.TestCase):
node=self.uuid, limit=None, details=True)
class TestUpdate(test_base.NodeTest):
class TestUpdate(test_base.NodeTestBase):
def setUp(self):
super(TestUpdate, self).setUp()
self.ironic = mock.Mock()
@ -866,7 +790,7 @@ class TestUpdate(test_base.NodeTest):
node_info=self.node_info)
class TestNodeCacheGetByPath(test_base.NodeTest):
class TestNodeCacheGetByPath(test_base.NodeTestBase):
def setUp(self):
super(TestNodeCacheGetByPath, self).setUp()
self.node = mock.Mock(spec=['uuid', 'properties'],
@ -884,7 +808,7 @@ class TestNodeCacheGetByPath(test_base.NodeTest):
@mock.patch.object(locking.lockutils, 'internal_lock', autospec=True)
class TestInternalLock(test_base.NodeTest):
class TestInternalLock(test_base.NodeTestBase):
def test_acquire(self, lock_mock):
node_info = node_cache.NodeInfo(self.uuid)
self.addCleanup(node_info.release_lock)
@ -928,7 +852,7 @@ class TestInternalLock(test_base.NodeTest):
@mock.patch.object(node_cache, 'add_node', autospec=True)
@mock.patch.object(ir_utils, 'get_client', autospec=True)
class TestNodeCreate(test_base.NodeTest):
class TestNodeCreate(test_base.NodeTestBase):
def setUp(self):
super(TestNodeCreate, self).setUp()
self.mock_client = mock.Mock()
@ -971,19 +895,20 @@ class TestNodeCreate(test_base.NodeTest):
self.assertFalse(mock_add_node.called)
class TestNodeCacheListNode(test_base.NodeTest):
class TestNodeCacheListNode(test_base.NodeTestBase):
def setUp(self):
super(TestNodeCacheListNode, self).setUp()
self.uuid2 = uuidutils.generate_uuid()
self.uuid3 = uuidutils.generate_uuid()
session = db.get_writer_session()
with session.begin():
db.Node(uuid=self.uuid,
started_at=datetime.datetime(1, 1, 2)).save(session)
db.Node(uuid=self.uuid2, started_at=datetime.datetime(1, 1, 1),
finished_at=datetime.datetime(1, 1, 3)).save(session)
db.Node(uuid=self.uuid3, started_at=datetime.datetime(1, 1, 3),
state='error').save(session)
db.create_node(uuid=self.uuid,
state=istate.States.finished,
started_at=datetime.datetime(1, 1, 2))
db.create_node(uuid=self.uuid2,
state=istate.States.finished,
started_at=datetime.datetime(1, 1, 1),
finished_at=datetime.datetime(1, 1, 3))
db.create_node(uuid=self.uuid3, started_at=datetime.datetime(1, 1, 3),
state=istate.States.error)
# mind please node(self.uuid).started_at > node(self.uuid2).started_at
# and the result ordering is strict in node_cache.get_node_list newer first
@ -1032,25 +957,6 @@ class TestNodeInfoVersionId(test_base.NodeStateTest):
self.assertRaisesRegex(utils.NotFoundInCacheError, '.*',
func)
def test_set(self):
with db.ensure_transaction() as session:
self.node_info._set_version_id(uuidutils.generate_uuid(),
session)
row = db.model_query(db.Node).get(self.node_info.uuid)
self.assertEqual(self.node_info.version_id, row.version_id)
def test_set_race(self):
with db.ensure_transaction() as session:
row = db.model_query(db.Node, session=session).get(
self.node_info.uuid)
row.update({'version_id': uuidutils.generate_uuid()})
row.save(session)
self.assertRaisesRegex(utils.NodeStateRaceCondition,
'Node state mismatch',
self.node_info._set_state,
istate.States.finished)
class TestNodeInfoState(test_base.NodeStateTest):
def test_get(self):
@ -1059,7 +965,7 @@ class TestNodeInfoState(test_base.NodeStateTest):
def test_set(self):
self.node_info._set_state(istate.States.finished)
row = db.model_query(db.Node).get(self.node_info.uuid)
row = db.get_node(self.node_info.uuid)
self.assertEqual(self.node_info.state, row.state)
def test_commit(self):
@ -1068,7 +974,7 @@ class TestNodeInfoState(test_base.NodeStateTest):
self.node_info.error = "Boo!"
self.node_info.commit()
row = db.model_query(db.Node).get(self.node_info.uuid)
row = db.get_node(self.node_info.uuid)
self.assertEqual(self.node_info.started_at, row.started_at)
self.assertEqual(self.node_info.finished_at, row.finished_at)
self.assertEqual(self.node_info.error, row.error)
@ -1263,7 +1169,7 @@ class TestFsmEvent(test_base.NodeStateTest):
@mock.patch.object(node_cache, 'add_node', autospec=True)
@mock.patch.object(node_cache, 'NodeInfo', autospec=True)
class TestStartIntrospection(test_base.NodeTest):
class TestStartIntrospection(test_base.NodeTestBase):
def prepare_mocks(fn):
@functools.wraps(fn)
def inner(self, NodeMock, *args):
@ -1309,12 +1215,19 @@ class TestStartIntrospection(test_base.NodeTest):
self.assertFalse(add_node_mock.called)
@prepare_mocks
def test_node_not_in_db(self, fsm_event_mock, add_node_mock):
def test_node_not_in_cache(self, fsm_event_mock, add_node_mock):
fsm_event_mock.side_effect = utils.NotFoundInCacheError('Oops!')
node_cache.start_introspection(self.node_info.uuid)
add_node_mock.assert_called_once_with(self.node_info.uuid,
istate.States.starting)
@prepare_mocks
def test_node_not_in_db(self, fsm_event_mock, add_node_mock):
fsm_event_mock.side_effect = utils.NodeNotFoundInDBError()
node_cache.start_introspection(self.node_info.uuid)
add_node_mock.assert_called_once_with(self.node_info.uuid,
istate.States.starting)
@prepare_mocks
def test_custom_exc_fsm_event(self, fsm_event_mock, add_node_mock):
class CustomError(Exception):
@ -1338,7 +1251,7 @@ class TestStartIntrospection(test_base.NodeTest):
istate.States.starting)
class TestIntrospectionDataDbStore(test_base.NodeTest):
class TestIntrospectionDataDbStore(test_base.NodeTestBase):
def setUp(self):
super(TestIntrospectionDataDbStore, self).setUp()
node_cache.add_node(self.node.uuid,
@ -1384,7 +1297,7 @@ class TestIntrospectionDataDbStore(test_base.NodeTest):
@mock.patch.object(ir_utils, 'lookup_node', autospec=True)
class TestRecordNode(test_base.NodeTest):
class TestRecordNode(test_base.NodeTestBase):
def setUp(self):
super(TestRecordNode, self).setUp()
self.node.provision_state = 'active'

View File

@ -18,7 +18,8 @@ import fixtures
from oslo_config import cfg
from ironic_inspector.common import ironic as ir_utils
from ironic_inspector import db
from ironic_inspector.db import api as db
from ironic_inspector.db import model
from ironic_inspector import introspection_state as istate
from ironic_inspector.plugins import introspection_data
from ironic_inspector.test import base as test_base
@ -26,7 +27,7 @@ from ironic_inspector.test import base as test_base
CONF = cfg.CONF
class BaseTest(test_base.NodeTest):
class BaseTest(test_base.NodeTestBase):
data = {
'ipmi_address': '1.2.3.4',
'cpus': 2,
@ -75,18 +76,16 @@ class TestSwiftStore(BaseTest):
def _create_node(self):
session = db.get_writer_session()
with session.begin():
db.Node(uuid=self.node_info.uuid,
state=istate.States.starting).save(session)
model.Node(uuid=self.node_info.uuid,
state=istate.States.starting).save(session)
class TestDatabaseStore(BaseTest):
def setUp(self):
super(TestDatabaseStore, self).setUp()
self.driver = introspection_data.DatabaseStore()
session = db.get_writer_session()
with session.begin():
db.Node(uuid=self.node_info.uuid,
state=istate.States.starting).save(session)
db.create_node(uuid=self.node_info.uuid,
state=istate.States.starting)
def test_store_and_get_data(self):
self.driver.save(self.node_info.uuid, self.data)

View File

@ -29,7 +29,7 @@ from oslo_utils import uuidutils
from ironic_inspector.common import ironic as ir_utils
from ironic_inspector.common import swift
from ironic_inspector import db
from ironic_inspector.db import api as db
from ironic_inspector import introspection_state as istate
from ironic_inspector import node_cache
from ironic_inspector.plugins import base as plugins_base
@ -43,7 +43,7 @@ from ironic_inspector import utils
CONF = cfg.CONF
class BaseTest(test_base.NodeTest):
class BaseTest(test_base.NodeTestBase):
def setUp(self):
super(BaseTest, self).setUp()
self.started_at = timeutils.utcnow()
@ -445,10 +445,10 @@ class TestProcessNode(BaseTest):
self.useFixture(fixtures.MockPatchObject(
eventlet.greenthread, 'sleep', autospec=True))
self.node_info._state = istate.States.waiting
db.Node(uuid=self.node_info.uuid, state=self.node_info._state,
started_at=self.node_info.started_at,
finished_at=self.node_info.finished_at,
error=self.node_info.error).save(self.session)
db.create_node(uuid=self.node_info.uuid, state=self.node_info._state,
started_at=self.node_info.started_at,
finished_at=self.node_info.finished_at,
error=self.node_info.error)
def test_return_includes_uuid(self):
ret_val = process._process_node(self.node_info, self.node, self.data)
@ -594,7 +594,7 @@ class TestProcessNode(BaseTest):
self.assertNotIn('logs',
json.loads(swift_conn.create_object.call_args[0][1]))
@mock.patch.object(node_cache, 'store_introspection_data', autospec=True)
@mock.patch.object(db, 'store_introspection_data', autospec=True)
def test_store_data_with_database(self, store_mock):
CONF.set_override('store_data', 'database', 'processing')
@ -604,7 +604,7 @@ class TestProcessNode(BaseTest):
store_mock.assert_called_once_with(self.node_info.uuid, data, True)
self.assertEqual(data, store_mock.call_args[0][1])
@mock.patch.object(node_cache, 'store_introspection_data', autospec=True)
@mock.patch.object(db, 'store_introspection_data', autospec=True)
def test_store_data_no_logs_with_database(self, store_mock):
CONF.set_override('store_data', 'database', 'processing')
@ -698,10 +698,10 @@ class TestReapplyNode(BaseTest):
self.commit_fixture = self.useFixture(
fixtures.MockPatchObject(node_cache.NodeInfo, 'commit',
autospec=True))
db.Node(uuid=self.node_info.uuid, state=self.node_info._state,
started_at=self.node_info.started_at,
finished_at=self.node_info.finished_at,
error=self.node_info.error).save(self.session)
db.create_node(uuid=self.node_info.uuid, state=self.node_info._state,
started_at=self.node_info.started_at,
finished_at=self.node_info.finished_at,
error=self.node_info.error)
def call(self):
process._reapply(self.node_info, introspection_data=self.data)

View File

@ -17,7 +17,7 @@ from unittest import mock
from oslo_utils import uuidutils
from ironic_inspector import db
from ironic_inspector.db import api as db
from ironic_inspector.plugins import base as plugins_base
from ironic_inspector import rules
from ironic_inspector.test import base as test_base
@ -211,11 +211,9 @@ class TestDeleteRule(BaseTest):
def test_delete(self):
rules.delete(self.uuid)
self.assertEqual([(self.uuid2,)], db.model_query(db.Rule.uuid).all())
self.assertFalse(db.model_query(db.RuleCondition)
.filter_by(rule=self.uuid).all())
self.assertFalse(db.model_query(db.RuleAction)
.filter_by(rule=self.uuid).all())
self.assertEqual([self.uuid2], [r.uuid for r in db.get_rules()])
self.assertFalse(db.get_rules_conditions(rule=self.uuid))
self.assertFalse(db.get_rules_actions(rule=self.uuid))
def test_delete_non_existing(self):
self.assertRaises(utils.Error, rules.delete, 'foo')
@ -223,9 +221,9 @@ class TestDeleteRule(BaseTest):
def test_delete_all(self):
rules.delete_all()
self.assertFalse(db.model_query(db.Rule).all())
self.assertFalse(db.model_query(db.RuleCondition).all())
self.assertFalse(db.model_query(db.RuleAction).all())
self.assertFalse(db.get_rules())
self.assertFalse(db.get_rules_conditions())
self.assertFalse(db.get_rules_actions())
@mock.patch.object(plugins_base, 'rule_conditions_manager', autospec=True)

View File

@ -26,6 +26,7 @@ CONF = cfg.CONF
class TestCheckAuth(base.BaseTest):
def setUp(self):
super(TestCheckAuth, self).setUp()
self.cfg.config(auth_strategy='keystone')

View File

@ -208,6 +208,32 @@ class DeferredBasicAuthMiddleware(object):
return req.get_response(self.app)
class NodeNotFoundInDBError(Error):
"""The node was not found in the database."""
# NOTE(TheJulia): This exception exists largely to help facilitate
# Internal error handling.
def __init__(self, **kwargs):
msg = 'The requested node was not found.'
super(NodeNotFoundInDBError, self).__init__(
msg, code=404, log_level='error', **kwargs)
class RuleUUIDExistError(Error):
"""Rule requested already exists in the database."""
def __init__(self, uuid, *args, **kwargs):
message = _('Rule with UUID %s already exists') % uuid
kwargs.setdefault('code', 409)
super(RuleUUIDExistError, self).__init__(message, *args, **kwargs)
class RuleNotFoundError(Error):
"""The requested rule was not found."""
def __init__(self, uuid, *args, **kwargs):
message = _('Rule %s was not found') % uuid
kwargs.setdefault('code', 404)
super(RuleNotFoundError, self).__init__(message, *args, **kwargs)
def executor():
"""Return the current futures executor."""
global _EXECUTOR

View File

@ -0,0 +1,37 @@
---
upgrade:
- |
The minimum version of SQLAlchemy is now ``1.4.0``, in preparation for the
future anticipated release of SQLAlchemy ``2.0.0``.
- |
The minimum version of Oslo.DB is now ``12.1.0``, in preparation for the
future anticipated release of SQLAlchemy ``2.0.0``.
- |
Database schema upgrades from versions prior to ``7.3.0`` are not
supported. Please upgrade to an intermediate release prior to upgrading
to this release.
fixes:
- |
Fixes an issue where database responses of nodes would get orphaned
in inspector process RAM, and would not be garbage collected.
We were able to discover and reproduce this issue while working on
database connectivity locks remaining in place.
Please see `story 2009727 <https://storyboard.openstack.org/#!/story/2009727>`_
for more details.
deprecations:
- |
Plugin maintainers should be aware that the Node Cache object field
``version_id`` filed is no longer in use. It is still returned
by the data model if stored for the purposes of compatability, but
Inspector will not update the field through the normal course of it's
operation.
other:
- |
Plugin maintainers who are directly working with the database will
need to update their plugins. Specifically the Database API has
been delineated into using ``enginefacade`` with a dedicated
reader and writer model, in anticipation of support for SQLAlchemy 2.0
and an eventual merge of Inspector into Ironic at some point in the
future. Database actions are now performed through the
``ironic_inspector.db.api`` module, where previously they were spread
across ``ironic_inspector.db`` and ``ironic_inspector.node_cache``.

View File

@ -24,7 +24,7 @@ openstacksdk>=0.40.0 # Apache-2.0
oslo.concurrency>=3.26.0 # Apache-2.0
oslo.config>=6.8.0 # Apache-2.0
oslo.context>=2.22.0 # Apache-2.0
oslo.db>=6.0.0 # Apache-2.0
oslo.db>=12.1.0 # Apache-2.0
oslo.i18n>=3.20.0 # Apache-2.0
oslo.log>=4.3.0 # Apache-2.0
oslo.messaging>=5.32.0 # Apache-2.0
@ -37,5 +37,5 @@ oslo.upgradecheck>=1.2.0 # Apache-2.0
oslo.utils>=4.5.0 # Apache-2.0
tenacity>=6.2.0 # Apache-2.0
stevedore>=1.20.0 # Apache-2.0
SQLAlchemy>=1.2.19 # MIT
SQLAlchemy>=1.4.0 # MIT
tooz>=2.5.1 # Apache-2.0

View File

@ -16,6 +16,8 @@ commands =
setenv =
VIRTUAL_ENV={envdir}
PYTHONDONTWRITEBYTECODE=1
PYTHONUNBUFFERED=1
SQLALCHEMY_WARN_20=true
TZ=UTC
TESTS_DIR=./ironic_inspector/test/unit/
passenv = http_proxy,HTTP_PROXY,https_proxy,HTTPS_PROXY,no_proxy,NO_PROXY