diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index 98ebb30d..5796e273 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -28,13 +28,13 @@ from oslo_utils import strutils import six import sqlalchemy as sa from sqlalchemy import exc as sa_exc -from sqlalchemy import orm as sa_orm from sqlalchemy import pool as sa_pool +from sqlalchemy import sql from taskflow import exceptions as exc from taskflow import logging from taskflow.persistence.backends.sqlalchemy import migration -from taskflow.persistence.backends.sqlalchemy import models +from taskflow.persistence.backends.sqlalchemy import tables from taskflow.persistence import base from taskflow.persistence import logbook from taskflow.types import failure @@ -180,6 +180,50 @@ def _ping_listener(dbapi_conn, connection_rec, connection_proxy): raise +class Alchemist(object): + """Internal <-> external row <-> objects + other helper functions. + + NOTE(harlowja): for internal usage only. + """ + def __init__(self, tables): + self._tables = tables + + @staticmethod + def convert_flow_detail(row): + return logbook.FlowDetail.from_dict(dict(row.items())) + + @staticmethod + def convert_book(row): + return logbook.LogBook.from_dict(dict(row.items())) + + @staticmethod + def convert_atom_detail(row): + row = dict(row.items()) + atom_cls = logbook.atom_detail_class(row.pop('atom_type')) + return atom_cls.from_dict(row) + + def _atom_query_iter(self, conn, parent_uuid): + q = (sql.select([self._tables.atomdetails]). + where(self._tables.atomdetails.c.parent_uuid == parent_uuid)) + for row in conn.execute(q): + yield self.convert_atom_detail(row) + + def _flow_query_iter(self, conn, parent_uuid): + q = (sql.select([self._tables.flowdetails]). + where(self._tables.flowdetails.c.parent_uuid == parent_uuid)) + for row in conn.execute(q): + yield self.convert_flow_detail(row) + + def populate_book(self, conn, book): + for fd in self._flow_query_iter(conn, book.uuid): + book.add(fd) + self.populate_flow_detail(conn, fd) + + def populate_flow_detail(self, conn, fd): + for ad in self._atom_query_iter(conn, fd.uuid): + fd.add(ad) + + class SQLAlchemyBackend(base.Backend): """A sqlalchemy backend. @@ -197,7 +241,6 @@ class SQLAlchemyBackend(base.Backend): else: self._engine = None self._owns_engine = True - self._session_maker = None self._validated = False def _create_engine(self): @@ -270,14 +313,8 @@ class SQLAlchemyBackend(base.Backend): self._engine = self._create_engine() return self._engine - def _get_session_maker(self): - if self._session_maker is None: - self._session_maker = sa_orm.sessionmaker(bind=self.engine, - autocommit=True) - return self._session_maker - def get_connection(self): - conn = Connection(self, self._get_session_maker()) + conn = Connection(self) if not self._validated: try: max_retries = misc.as_int(self._conf.get('max_retries', None)) @@ -288,9 +325,6 @@ class SQLAlchemyBackend(base.Backend): return conn def close(self): - if self._session_maker is not None: - self._session_maker.close_all() - self._session_maker = None if self._engine is not None and self._owns_engine: # NOTE(harlowja): Only dispose of the engine and clear it from # our local state if we actually own the engine in the first @@ -304,10 +338,12 @@ class SQLAlchemyBackend(base.Backend): class Connection(base.Connection): - def __init__(self, backend, session_maker): + def __init__(self, backend): self._backend = backend - self._session_maker = session_maker self._engine = backend.engine + self._metadata = sa.MetaData() + self._tables = tables.fetch(self._metadata) + self._converter = Alchemist(self._tables) @property def backend(self): @@ -315,7 +351,7 @@ class Connection(base.Connection): def validate(self, max_retries=0): - def test_connect(failures): + def verify_connect(failures): try: # See if we can make a connection happen. # @@ -332,7 +368,7 @@ class Connection(base.Connection): return True failures = [] - if test_connect(failures): + if verify_connect(failures): return # Sorry it didn't work out... @@ -349,40 +385,13 @@ class Connection(base.Connection): LOG.info("Attempting to test the connection again in %s seconds.", sleepy_secs) time.sleep(sleepy_secs) - if test_connect(failures): + if verify_connect(failures): return attempts_left -= 1 # Sorry it didn't work out... failures[-1].reraise() - def _run_in_session(self, functor, *args, **kwargs): - """Runs a callback in a session. - - This function proxy will create a session, and then call the callback - with that session (along with the provided args and kwargs). It ensures - that the session is opened & closed and makes sure that sqlalchemy - exceptions aren't emitted from the callback or sessions actions (as - that would expose the underlying sqlalchemy exception model). - """ - try: - session = self._make_session() - with session.begin(): - return functor(session, *args, **kwargs) - except sa_exc.SQLAlchemyError as e: - LOG.exception("Failed running '%s' within a database session", - functor.__name__) - raise exc.StorageFailure("Storage backend internal error, failed" - " running '%s' within a database" - " session" % functor.__name__, e) - - def _make_session(self): - try: - return self._session_maker() - except sa_exc.SQLAlchemyError as e: - LOG.exception('Failed creating database session') - raise exc.StorageFailure("Failed creating database session", e) - def upgrade(self): try: with contextlib.closing(self._engine.connect()) as conn: @@ -390,237 +399,164 @@ class Connection(base.Connection): # and we don't recommend to use SQLite in production # deployments, so migrations are rarely needed # for SQLite. So we don't bother about working around - # SQLite limitations, and create database from models - # when it is in use. + # SQLite limitations, and create the database directly from + # the tables when it is in use... if 'sqlite' in self._engine.url.drivername: - models.BASE.metadata.create_all(conn) + self._metadata.create_all(bind=conn) else: migration.db_sync(conn) except sa_exc.SQLAlchemyError as e: - LOG.exception('Failed upgrading database version') raise exc.StorageFailure("Failed upgrading database version", e) - def _clear_all(self, session): - # NOTE(harlowja): due to how we have our relationship setup and - # cascading deletes are enabled, this will cause all associated - # task details and flow details to automatically be purged. + def clear_all(self): try: - return session.query(models.LogBook).delete() + logbooks = self._tables.logbooks + with self._engine.begin() as conn: + conn.execute(logbooks.delete()) except sa_exc.DBAPIError as e: - LOG.exception('Failed clearing all entries') raise exc.StorageFailure("Failed clearing all entries", e) - def clear_all(self): - return self._run_in_session(self._clear_all) - - def _update_atom_details(self, session, ad): - # Must already exist since a atoms details has a strong connection to - # a flow details, and atom details can not be saved on there own since - # they *must* have a connection to an existing flow detail. - ad_m = _atom_details_get_model(ad.uuid, session=session) - ad_m = _atomdetails_merge(ad_m, ad) - ad_m = session.merge(ad_m) - return _convert_ad_to_external(ad_m) - def update_atom_details(self, atom_detail): - return self._run_in_session(self._update_atom_details, ad=atom_detail) + try: + atomdetails = self._tables.atomdetails + with self._engine.begin() as conn: + q = (sql.select([atomdetails]). + where(atomdetails.c.uuid == atom_detail.uuid)) + row = conn.execute(q).first() + if not row: + raise exc.NotFound("No atom details found with uuid" + " '%s'" % atom_detail.uuid) + e_ad = self._converter.convert_atom_detail(row) + self._update_atom_details(conn, atom_detail, e_ad) + return e_ad + except sa_exc.SQLAlchemyError as e: + raise exc.StorageFailure("Failed updating atom details with" + " uuid '%s'" % atom_detail.uuid, e) - def _update_flow_details(self, session, fd): - # Must already exist since a flow details has a strong connection to - # a logbook, and flow details can not be saved on there own since they - # *must* have a connection to an existing logbook. - fd_m = _flow_details_get_model(fd.uuid, session=session) - fd_m = _flowdetails_merge(fd_m, fd) - fd_m = session.merge(fd_m) - return _convert_fd_to_external(fd_m) + def _insert_flow_details(self, conn, fd, parent_uuid): + value = fd.to_dict() + value['parent_uuid'] = parent_uuid + conn.execute(sql.insert(self._tables.flowdetails, value)) + for ad in fd: + self._insert_atom_details(conn, ad, fd.uuid) + + def _insert_atom_details(self, conn, ad, parent_uuid): + value = ad.to_dict() + value['parent_uuid'] = parent_uuid + value['atom_type'] = logbook.atom_detail_type(ad) + conn.execute(sql.insert(self._tables.atomdetails, value)) + + def _update_atom_details(self, conn, ad, e_ad): + e_ad.merge(ad) + conn.execute(sql.update(self._tables.atomdetails) + .where(self._tables.atomdetails.c.uuid == e_ad.uuid) + .values(e_ad.to_dict())) + + def _update_flow_details(self, conn, fd, e_fd): + e_fd.merge(fd) + conn.execute(sql.update(self._tables.flowdetails) + .where(self._tables.flowdetails.c.uuid == e_fd.uuid) + .values(e_fd.to_dict())) + for ad in fd: + e_ad = e_fd.find(ad.uuid) + if e_ad is None: + e_fd.add(ad) + self._insert_atom_details(conn, ad, fd.uuid) + else: + self._update_atom_details(conn, ad, e_ad) def update_flow_details(self, flow_detail): - return self._run_in_session(self._update_flow_details, fd=flow_detail) - - def _destroy_logbook(self, session, lb_id): try: - lb = _logbook_get_model(lb_id, session=session) - session.delete(lb) - except sa_exc.DBAPIError as e: - LOG.exception('Failed destroying logbook') - raise exc.StorageFailure("Failed destroying logbook %s" % lb_id, e) + flowdetails = self._tables.flowdetails + with self._engine.begin() as conn: + q = (sql.select([flowdetails]). + where(flowdetails.c.uuid == flow_detail.uuid)) + row = conn.execute(q).first() + if not row: + raise exc.NotFound("No flow details found with" + " uuid '%s'" % flow_detail.uuid) + e_fd = self._converter.convert_flow_detail(row) + self._converter.populate_flow_detail(conn, e_fd) + self._update_flow_details(conn, flow_detail, e_fd) + return e_fd + except sa_exc.SQLAlchemyError as e: + raise exc.StorageFailure("Failed updating flow details with" + " uuid '%s'" % flow_detail.uuid, e) def destroy_logbook(self, book_uuid): - return self._run_in_session(self._destroy_logbook, lb_id=book_uuid) - - def _save_logbook(self, session, lb): try: - lb_m = _logbook_get_model(lb.uuid, session=session) - lb_m = _logbook_merge(lb_m, lb) - except exc.NotFound: - lb_m = _convert_lb_to_internal(lb) - try: - lb_m = session.merge(lb_m) - return _convert_lb_to_external(lb_m) + logbooks = self._tables.logbooks + with self._engine.begin() as conn: + q = logbooks.delete().where(logbooks.c.uuid == book_uuid) + r = conn.execute(q) + if r.rowcount == 0: + raise exc.NotFound("No logbook found with" + " uuid '%s'" % book_uuid) except sa_exc.DBAPIError as e: - LOG.exception('Failed saving logbook') - raise exc.StorageFailure("Failed saving logbook %s" % lb.uuid, e) + raise exc.StorageFailure("Failed destroying" + " logbook '%s'" % book_uuid, e) def save_logbook(self, book): - return self._run_in_session(self._save_logbook, lb=book) + try: + logbooks = self._tables.logbooks + with self._engine.begin() as conn: + q = (sql.select([logbooks]). + where(logbooks.c.uuid == book.uuid)) + row = conn.execute(q).first() + if row: + e_lb = self._converter.convert_book(row) + self._converter.populate_book(conn, e_lb) + e_lb.merge(book) + conn.execute(sql.update(logbooks) + .where(logbooks.c.uuid == e_lb.uuid) + .values(e_lb.to_dict())) + for fd in book: + e_fd = e_lb.find(fd.uuid) + if e_fd is None: + e_lb.add(fd) + self._insert_flow_details(conn, fd, e_lb.uuid) + else: + self._update_flow_details(conn, fd, e_fd) + return e_lb + else: + conn.execute(sql.insert(logbooks, book.to_dict())) + for fd in book: + self._insert_flow_details(conn, fd, book.uuid) + return book + except sa_exc.DBAPIError as e: + raise exc.StorageFailure("Failed saving logbook" + " '%s'" % book.uuid, e) def get_logbook(self, book_uuid): - session = self._make_session() try: - lb = _logbook_get_model(book_uuid, session=session) - return _convert_lb_to_external(lb) + logbooks = self._tables.logbooks + with contextlib.closing(self._engine.connect()) as conn: + q = (sql.select([logbooks]). + where(logbooks.c.uuid == book_uuid)) + row = conn.execute(q).first() + if not row: + raise exc.NotFound("No logbook found with" + " uuid '%s'" % book_uuid) + book = self._converter.convert_book(row) + self._converter.populate_book(conn, book) + return book except sa_exc.DBAPIError as e: - LOG.exception('Failed getting logbook') - raise exc.StorageFailure("Failed getting logbook %s" % book_uuid, - e) + raise exc.StorageFailure( + "Failed getting logbook '%s'" % book_uuid, e) def get_logbooks(self): - session = self._make_session() + gathered = [] try: - raw_books = session.query(models.LogBook).all() - books = [_convert_lb_to_external(lb) for lb in raw_books] + with contextlib.closing(self._engine.connect()) as conn: + q = sql.select([self._tables.logbooks]) + for row in conn.execute(q): + book = self._converter.convert_book(row) + self._converter.populate_book(conn, book) + gathered.append(book) except sa_exc.DBAPIError as e: - LOG.exception('Failed getting logbooks') raise exc.StorageFailure("Failed getting logbooks", e) - for lb in books: - yield lb + for book in gathered: + yield book def close(self): pass - -### -# Internal <-> external model + merging + other helper functions. -### - - -def _atomdetails_merge(ad_m, ad): - atom_type = logbook.atom_detail_type(ad) - if atom_type != ad_m.atom_type: - raise exc.StorageFailure("Can not merge differing atom types " - "(%s != %s)" % (atom_type, ad_m.atom_type)) - ad_d = ad.to_dict() - ad_m.state = ad_d['state'] - ad_m.intention = ad_d['intention'] - ad_m.results = ad_d['results'] - ad_m.version = ad_d['version'] - ad_m.failure = ad_d['failure'] - ad_m.meta = ad_d['meta'] - ad_m.name = ad_d['name'] - return ad_m - - -def _flowdetails_merge(fd_m, fd): - fd_d = fd.to_dict() - fd_m.state = fd_d['state'] - fd_m.name = fd_d['name'] - fd_m.meta = fd_d['meta'] - for ad in fd: - existing_ad = False - for ad_m in fd_m.atomdetails: - if ad_m.uuid == ad.uuid: - existing_ad = True - ad_m = _atomdetails_merge(ad_m, ad) - break - if not existing_ad: - ad_m = _convert_ad_to_internal(ad, fd_m.uuid) - fd_m.atomdetails.append(ad_m) - return fd_m - - -def _logbook_merge(lb_m, lb): - lb_d = lb.to_dict() - lb_m.meta = lb_d['meta'] - lb_m.name = lb_d['name'] - lb_m.created_at = lb_d['created_at'] - lb_m.updated_at = lb_d['updated_at'] - for fd in lb: - existing_fd = False - for fd_m in lb_m.flowdetails: - if fd_m.uuid == fd.uuid: - existing_fd = True - fd_m = _flowdetails_merge(fd_m, fd) - if not existing_fd: - lb_m.flowdetails.append(_convert_fd_to_internal(fd, lb_m.uuid)) - return lb_m - - -def _convert_fd_to_external(fd): - fd_c = logbook.FlowDetail(fd.name, uuid=fd.uuid) - fd_c.meta = fd.meta - fd_c.state = fd.state - for ad_m in fd.atomdetails: - fd_c.add(_convert_ad_to_external(ad_m)) - return fd_c - - -def _convert_fd_to_internal(fd, parent_uuid): - fd_m = models.FlowDetail(name=fd.name, uuid=fd.uuid, - parent_uuid=parent_uuid, meta=fd.meta, - state=fd.state) - fd_m.atomdetails = [] - for ad in fd: - fd_m.atomdetails.append(_convert_ad_to_internal(ad, fd_m.uuid)) - return fd_m - - -def _convert_ad_to_internal(ad, parent_uuid): - converted = ad.to_dict() - converted['atom_type'] = logbook.atom_detail_type(ad) - converted['parent_uuid'] = parent_uuid - return models.AtomDetail(**converted) - - -def _convert_ad_to_external(ad): - # Convert from sqlalchemy model -> external model, this allows us - # to change the internal sqlalchemy model easily by forcing a defined - # interface (that isn't the sqlalchemy model itself). - atom_cls = logbook.atom_detail_class(ad.atom_type) - return atom_cls.from_dict({ - 'state': ad.state, - 'intention': ad.intention, - 'results': ad.results, - 'failure': ad.failure, - 'meta': ad.meta, - 'version': ad.version, - 'name': ad.name, - 'uuid': ad.uuid, - }) - - -def _convert_lb_to_external(lb_m): - lb_c = logbook.LogBook(lb_m.name, lb_m.uuid) - lb_c.updated_at = lb_m.updated_at - lb_c.created_at = lb_m.created_at - lb_c.meta = lb_m.meta - for fd_m in lb_m.flowdetails: - lb_c.add(_convert_fd_to_external(fd_m)) - return lb_c - - -def _convert_lb_to_internal(lb_c): - lb_m = models.LogBook(uuid=lb_c.uuid, meta=lb_c.meta, name=lb_c.name) - lb_m.flowdetails = [] - for fd_c in lb_c: - lb_m.flowdetails.append(_convert_fd_to_internal(fd_c, lb_c.uuid)) - return lb_m - - -def _logbook_get_model(lb_id, session): - entry = session.query(models.LogBook).filter_by(uuid=lb_id).first() - if entry is None: - raise exc.NotFound("No logbook found with id: %s" % lb_id) - return entry - - -def _flow_details_get_model(flow_id, session): - entry = session.query(models.FlowDetail).filter_by(uuid=flow_id).first() - if entry is None: - raise exc.NotFound("No flow details found with id: %s" % flow_id) - return entry - - -def _atom_details_get_model(atom_id, session): - entry = session.query(models.AtomDetail).filter_by(uuid=atom_id).first() - if entry is None: - raise exc.NotFound("No atom details found with id: %s" % atom_id) - return entry diff --git a/taskflow/persistence/backends/sqlalchemy/models.py b/taskflow/persistence/backends/sqlalchemy/models.py deleted file mode 100644 index e43c9652..00000000 --- a/taskflow/persistence/backends/sqlalchemy/models.py +++ /dev/null @@ -1,97 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_serialization import jsonutils -from oslo_utils import timeutils -from oslo_utils import uuidutils -from sqlalchemy import Column, String, DateTime, Enum -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy import ForeignKey -from sqlalchemy.orm import backref -from sqlalchemy.orm import relationship -from sqlalchemy import types as types - -from taskflow.persistence import logbook -from taskflow import states - -BASE = declarative_base() - - -# TODO(harlowja): remove when oslo.db exists -class TimestampMixin(object): - created_at = Column(DateTime, default=timeutils.utcnow) - updated_at = Column(DateTime, onupdate=timeutils.utcnow) - - -class Json(types.TypeDecorator): - impl = types.Text - - def process_bind_param(self, value, dialect): - return jsonutils.dumps(value) - - def process_result_value(self, value, dialect): - return jsonutils.loads(value) - - -class ModelBase(TimestampMixin): - """Base model for all taskflow objects.""" - uuid = Column(String, default=uuidutils.generate_uuid, - primary_key=True, nullable=False, unique=True) - name = Column(String, nullable=True) - meta = Column(Json, nullable=True) - - -class LogBook(BASE, ModelBase): - """Represents a logbook for a set of flows.""" - __tablename__ = 'logbooks' - - # Relationships - flowdetails = relationship("FlowDetail", - single_parent=True, - backref=backref("logbooks", - cascade="save-update, delete, " - "merge")) - - -class FlowDetail(BASE, ModelBase): - __tablename__ = 'flowdetails' - - # Member variables - state = Column(String) - - # Relationships - parent_uuid = Column(String, ForeignKey('logbooks.uuid')) - atomdetails = relationship("AtomDetail", - single_parent=True, - backref=backref("flowdetails", - cascade="save-update, delete, " - "merge")) - - -class AtomDetail(BASE, ModelBase): - __tablename__ = 'atomdetails' - - # Member variables - atom_type = Column(Enum(*logbook.ATOM_TYPES, name='atom_types')) - state = Column(String) - intention = Column(Enum(*states.INTENTIONS, name='intentions')) - results = Column(Json) - failure = Column(Json) - version = Column(Json) - - # Relationships - parent_uuid = Column(String, ForeignKey('flowdetails.uuid')) diff --git a/taskflow/persistence/backends/sqlalchemy/tables.py b/taskflow/persistence/backends/sqlalchemy/tables.py new file mode 100644 index 00000000..239e6430 --- /dev/null +++ b/taskflow/persistence/backends/sqlalchemy/tables.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections + +from oslo.serialization import jsonutils +from oslo.utils import timeutils +from oslo.utils import uuidutils +from sqlalchemy import Table, Column, String, ForeignKey, DateTime, Enum +from sqlalchemy import types + +from taskflow.persistence import logbook +from taskflow import states + +Tables = collections.namedtuple('Tables', + ['logbooks', 'flowdetails', 'atomdetails']) + +# Column length limits... +NAME_LENGTH = 255 +UUID_LENGTH = 64 +STATE_LENGTH = 255 +VERSION_LENGTH = 64 + + +class Json(types.TypeDecorator): + impl = types.Text + + def process_bind_param(self, value, dialect): + if not value: + return None + return jsonutils.dumps(value) + + def process_result_value(self, value, dialect): + if not value: + return None + return jsonutils.loads(value) + + +def fetch(metadata): + """Returns the master set of table objects (which is also there schema).""" + logbooks = Table('logbooks', metadata, + Column('created_at', DateTime, + default=timeutils.utcnow), + Column('updated_at', DateTime, + default=timeutils.utcnow), + Column('meta', Json), + Column('name', String(length=NAME_LENGTH)), + Column('uuid', String(length=UUID_LENGTH), + primary_key=True, nullable=False, unique=True, + default=uuidutils.generate_uuid)) + flowdetails = Table('flowdetails', metadata, + Column('created_at', DateTime, + default=timeutils.utcnow), + Column('updated_at', DateTime, + default=timeutils.utcnow), + Column('parent_uuid', String(length=UUID_LENGTH), + ForeignKey('logbooks.uuid', + ondelete='CASCADE')), + Column('meta', Json), + Column('name', String(length=NAME_LENGTH)), + Column('state', String(length=STATE_LENGTH)), + Column('uuid', String(length=UUID_LENGTH), + primary_key=True, nullable=False, unique=True, + default=uuidutils.generate_uuid)) + atomdetails = Table('atomdetails', metadata, + Column('created_at', DateTime, + default=timeutils.utcnow), + Column('updated_at', DateTime, + default=timeutils.utcnow), + Column('meta', Json), + Column('parent_uuid', String(length=UUID_LENGTH), + ForeignKey('flowdetails.uuid', + ondelete='CASCADE')), + Column('name', String(length=NAME_LENGTH)), + Column('version', String(length=VERSION_LENGTH)), + Column('state', String(length=STATE_LENGTH)), + Column('uuid', String(length=UUID_LENGTH), + primary_key=True, nullable=False, unique=True, + default=uuidutils.generate_uuid), + Column('failure', Json), + Column('results', Json), + Column('atom_type', Enum(*logbook.ATOM_TYPES, + name='atom_types')), + Column('intention', Enum(*states.INTENTIONS, + name='intentions'))) + return Tables(logbooks, flowdetails, atomdetails)