Rework the sqlalchemy backend

We can just simplify the usage of an sqlalchemy if we just use
(we already have our own ORM like objects anyway) sqlalchemy core
in the first place and have a very tiny layer that converts back and
forth from our very limited object model that we use in our
persistence layer.

This change makes that adjustment, which makes it easier to read
and understand the actions the sqlalchemy backend is doing when
saving, reading and updating data, and avoids yet another layer
that isn't useful for our purposes anyway.

Change-Id: I911c509f65e7845aee86fed1622eaa56970741f2
This commit is contained in:
Joshua Harlow
2014-10-02 12:39:00 -07:00
parent 6924b3622a
commit 687ec91379
3 changed files with 281 additions and 343 deletions

View File

@@ -28,13 +28,13 @@ from oslo_utils import strutils
import six
import sqlalchemy as sa
from sqlalchemy import exc as sa_exc
from sqlalchemy import orm as sa_orm
from sqlalchemy import pool as sa_pool
from sqlalchemy import sql
from taskflow import exceptions as exc
from taskflow import logging
from taskflow.persistence.backends.sqlalchemy import migration
from taskflow.persistence.backends.sqlalchemy import models
from taskflow.persistence.backends.sqlalchemy import tables
from taskflow.persistence import base
from taskflow.persistence import logbook
from taskflow.types import failure
@@ -180,6 +180,50 @@ def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
raise
class Alchemist(object):
"""Internal <-> external row <-> objects + other helper functions.
NOTE(harlowja): for internal usage only.
"""
def __init__(self, tables):
self._tables = tables
@staticmethod
def convert_flow_detail(row):
return logbook.FlowDetail.from_dict(dict(row.items()))
@staticmethod
def convert_book(row):
return logbook.LogBook.from_dict(dict(row.items()))
@staticmethod
def convert_atom_detail(row):
row = dict(row.items())
atom_cls = logbook.atom_detail_class(row.pop('atom_type'))
return atom_cls.from_dict(row)
def _atom_query_iter(self, conn, parent_uuid):
q = (sql.select([self._tables.atomdetails]).
where(self._tables.atomdetails.c.parent_uuid == parent_uuid))
for row in conn.execute(q):
yield self.convert_atom_detail(row)
def _flow_query_iter(self, conn, parent_uuid):
q = (sql.select([self._tables.flowdetails]).
where(self._tables.flowdetails.c.parent_uuid == parent_uuid))
for row in conn.execute(q):
yield self.convert_flow_detail(row)
def populate_book(self, conn, book):
for fd in self._flow_query_iter(conn, book.uuid):
book.add(fd)
self.populate_flow_detail(conn, fd)
def populate_flow_detail(self, conn, fd):
for ad in self._atom_query_iter(conn, fd.uuid):
fd.add(ad)
class SQLAlchemyBackend(base.Backend):
"""A sqlalchemy backend.
@@ -197,7 +241,6 @@ class SQLAlchemyBackend(base.Backend):
else:
self._engine = None
self._owns_engine = True
self._session_maker = None
self._validated = False
def _create_engine(self):
@@ -270,14 +313,8 @@ class SQLAlchemyBackend(base.Backend):
self._engine = self._create_engine()
return self._engine
def _get_session_maker(self):
if self._session_maker is None:
self._session_maker = sa_orm.sessionmaker(bind=self.engine,
autocommit=True)
return self._session_maker
def get_connection(self):
conn = Connection(self, self._get_session_maker())
conn = Connection(self)
if not self._validated:
try:
max_retries = misc.as_int(self._conf.get('max_retries', None))
@@ -288,9 +325,6 @@ class SQLAlchemyBackend(base.Backend):
return conn
def close(self):
if self._session_maker is not None:
self._session_maker.close_all()
self._session_maker = None
if self._engine is not None and self._owns_engine:
# NOTE(harlowja): Only dispose of the engine and clear it from
# our local state if we actually own the engine in the first
@@ -304,10 +338,12 @@ class SQLAlchemyBackend(base.Backend):
class Connection(base.Connection):
def __init__(self, backend, session_maker):
def __init__(self, backend):
self._backend = backend
self._session_maker = session_maker
self._engine = backend.engine
self._metadata = sa.MetaData()
self._tables = tables.fetch(self._metadata)
self._converter = Alchemist(self._tables)
@property
def backend(self):
@@ -315,7 +351,7 @@ class Connection(base.Connection):
def validate(self, max_retries=0):
def test_connect(failures):
def verify_connect(failures):
try:
# See if we can make a connection happen.
#
@@ -332,7 +368,7 @@ class Connection(base.Connection):
return True
failures = []
if test_connect(failures):
if verify_connect(failures):
return
# Sorry it didn't work out...
@@ -349,40 +385,13 @@ class Connection(base.Connection):
LOG.info("Attempting to test the connection again in %s seconds.",
sleepy_secs)
time.sleep(sleepy_secs)
if test_connect(failures):
if verify_connect(failures):
return
attempts_left -= 1
# Sorry it didn't work out...
failures[-1].reraise()
def _run_in_session(self, functor, *args, **kwargs):
"""Runs a callback in a session.
This function proxy will create a session, and then call the callback
with that session (along with the provided args and kwargs). It ensures
that the session is opened & closed and makes sure that sqlalchemy
exceptions aren't emitted from the callback or sessions actions (as
that would expose the underlying sqlalchemy exception model).
"""
try:
session = self._make_session()
with session.begin():
return functor(session, *args, **kwargs)
except sa_exc.SQLAlchemyError as e:
LOG.exception("Failed running '%s' within a database session",
functor.__name__)
raise exc.StorageFailure("Storage backend internal error, failed"
" running '%s' within a database"
" session" % functor.__name__, e)
def _make_session(self):
try:
return self._session_maker()
except sa_exc.SQLAlchemyError as e:
LOG.exception('Failed creating database session')
raise exc.StorageFailure("Failed creating database session", e)
def upgrade(self):
try:
with contextlib.closing(self._engine.connect()) as conn:
@@ -390,237 +399,164 @@ class Connection(base.Connection):
# and we don't recommend to use SQLite in production
# deployments, so migrations are rarely needed
# for SQLite. So we don't bother about working around
# SQLite limitations, and create database from models
# when it is in use.
# SQLite limitations, and create the database directly from
# the tables when it is in use...
if 'sqlite' in self._engine.url.drivername:
models.BASE.metadata.create_all(conn)
self._metadata.create_all(bind=conn)
else:
migration.db_sync(conn)
except sa_exc.SQLAlchemyError as e:
LOG.exception('Failed upgrading database version')
raise exc.StorageFailure("Failed upgrading database version", e)
def _clear_all(self, session):
# NOTE(harlowja): due to how we have our relationship setup and
# cascading deletes are enabled, this will cause all associated
# task details and flow details to automatically be purged.
def clear_all(self):
try:
return session.query(models.LogBook).delete()
logbooks = self._tables.logbooks
with self._engine.begin() as conn:
conn.execute(logbooks.delete())
except sa_exc.DBAPIError as e:
LOG.exception('Failed clearing all entries')
raise exc.StorageFailure("Failed clearing all entries", e)
def clear_all(self):
return self._run_in_session(self._clear_all)
def _update_atom_details(self, session, ad):
# Must already exist since a atoms details has a strong connection to
# a flow details, and atom details can not be saved on there own since
# they *must* have a connection to an existing flow detail.
ad_m = _atom_details_get_model(ad.uuid, session=session)
ad_m = _atomdetails_merge(ad_m, ad)
ad_m = session.merge(ad_m)
return _convert_ad_to_external(ad_m)
def update_atom_details(self, atom_detail):
return self._run_in_session(self._update_atom_details, ad=atom_detail)
try:
atomdetails = self._tables.atomdetails
with self._engine.begin() as conn:
q = (sql.select([atomdetails]).
where(atomdetails.c.uuid == atom_detail.uuid))
row = conn.execute(q).first()
if not row:
raise exc.NotFound("No atom details found with uuid"
" '%s'" % atom_detail.uuid)
e_ad = self._converter.convert_atom_detail(row)
self._update_atom_details(conn, atom_detail, e_ad)
return e_ad
except sa_exc.SQLAlchemyError as e:
raise exc.StorageFailure("Failed updating atom details with"
" uuid '%s'" % atom_detail.uuid, e)
def _update_flow_details(self, session, fd):
# Must already exist since a flow details has a strong connection to
# a logbook, and flow details can not be saved on there own since they
# *must* have a connection to an existing logbook.
fd_m = _flow_details_get_model(fd.uuid, session=session)
fd_m = _flowdetails_merge(fd_m, fd)
fd_m = session.merge(fd_m)
return _convert_fd_to_external(fd_m)
def _insert_flow_details(self, conn, fd, parent_uuid):
value = fd.to_dict()
value['parent_uuid'] = parent_uuid
conn.execute(sql.insert(self._tables.flowdetails, value))
for ad in fd:
self._insert_atom_details(conn, ad, fd.uuid)
def _insert_atom_details(self, conn, ad, parent_uuid):
value = ad.to_dict()
value['parent_uuid'] = parent_uuid
value['atom_type'] = logbook.atom_detail_type(ad)
conn.execute(sql.insert(self._tables.atomdetails, value))
def _update_atom_details(self, conn, ad, e_ad):
e_ad.merge(ad)
conn.execute(sql.update(self._tables.atomdetails)
.where(self._tables.atomdetails.c.uuid == e_ad.uuid)
.values(e_ad.to_dict()))
def _update_flow_details(self, conn, fd, e_fd):
e_fd.merge(fd)
conn.execute(sql.update(self._tables.flowdetails)
.where(self._tables.flowdetails.c.uuid == e_fd.uuid)
.values(e_fd.to_dict()))
for ad in fd:
e_ad = e_fd.find(ad.uuid)
if e_ad is None:
e_fd.add(ad)
self._insert_atom_details(conn, ad, fd.uuid)
else:
self._update_atom_details(conn, ad, e_ad)
def update_flow_details(self, flow_detail):
return self._run_in_session(self._update_flow_details, fd=flow_detail)
def _destroy_logbook(self, session, lb_id):
try:
lb = _logbook_get_model(lb_id, session=session)
session.delete(lb)
except sa_exc.DBAPIError as e:
LOG.exception('Failed destroying logbook')
raise exc.StorageFailure("Failed destroying logbook %s" % lb_id, e)
flowdetails = self._tables.flowdetails
with self._engine.begin() as conn:
q = (sql.select([flowdetails]).
where(flowdetails.c.uuid == flow_detail.uuid))
row = conn.execute(q).first()
if not row:
raise exc.NotFound("No flow details found with"
" uuid '%s'" % flow_detail.uuid)
e_fd = self._converter.convert_flow_detail(row)
self._converter.populate_flow_detail(conn, e_fd)
self._update_flow_details(conn, flow_detail, e_fd)
return e_fd
except sa_exc.SQLAlchemyError as e:
raise exc.StorageFailure("Failed updating flow details with"
" uuid '%s'" % flow_detail.uuid, e)
def destroy_logbook(self, book_uuid):
return self._run_in_session(self._destroy_logbook, lb_id=book_uuid)
def _save_logbook(self, session, lb):
try:
lb_m = _logbook_get_model(lb.uuid, session=session)
lb_m = _logbook_merge(lb_m, lb)
except exc.NotFound:
lb_m = _convert_lb_to_internal(lb)
try:
lb_m = session.merge(lb_m)
return _convert_lb_to_external(lb_m)
logbooks = self._tables.logbooks
with self._engine.begin() as conn:
q = logbooks.delete().where(logbooks.c.uuid == book_uuid)
r = conn.execute(q)
if r.rowcount == 0:
raise exc.NotFound("No logbook found with"
" uuid '%s'" % book_uuid)
except sa_exc.DBAPIError as e:
LOG.exception('Failed saving logbook')
raise exc.StorageFailure("Failed saving logbook %s" % lb.uuid, e)
raise exc.StorageFailure("Failed destroying"
" logbook '%s'" % book_uuid, e)
def save_logbook(self, book):
return self._run_in_session(self._save_logbook, lb=book)
try:
logbooks = self._tables.logbooks
with self._engine.begin() as conn:
q = (sql.select([logbooks]).
where(logbooks.c.uuid == book.uuid))
row = conn.execute(q).first()
if row:
e_lb = self._converter.convert_book(row)
self._converter.populate_book(conn, e_lb)
e_lb.merge(book)
conn.execute(sql.update(logbooks)
.where(logbooks.c.uuid == e_lb.uuid)
.values(e_lb.to_dict()))
for fd in book:
e_fd = e_lb.find(fd.uuid)
if e_fd is None:
e_lb.add(fd)
self._insert_flow_details(conn, fd, e_lb.uuid)
else:
self._update_flow_details(conn, fd, e_fd)
return e_lb
else:
conn.execute(sql.insert(logbooks, book.to_dict()))
for fd in book:
self._insert_flow_details(conn, fd, book.uuid)
return book
except sa_exc.DBAPIError as e:
raise exc.StorageFailure("Failed saving logbook"
" '%s'" % book.uuid, e)
def get_logbook(self, book_uuid):
session = self._make_session()
try:
lb = _logbook_get_model(book_uuid, session=session)
return _convert_lb_to_external(lb)
logbooks = self._tables.logbooks
with contextlib.closing(self._engine.connect()) as conn:
q = (sql.select([logbooks]).
where(logbooks.c.uuid == book_uuid))
row = conn.execute(q).first()
if not row:
raise exc.NotFound("No logbook found with"
" uuid '%s'" % book_uuid)
book = self._converter.convert_book(row)
self._converter.populate_book(conn, book)
return book
except sa_exc.DBAPIError as e:
LOG.exception('Failed getting logbook')
raise exc.StorageFailure("Failed getting logbook %s" % book_uuid,
e)
raise exc.StorageFailure(
"Failed getting logbook '%s'" % book_uuid, e)
def get_logbooks(self):
session = self._make_session()
gathered = []
try:
raw_books = session.query(models.LogBook).all()
books = [_convert_lb_to_external(lb) for lb in raw_books]
with contextlib.closing(self._engine.connect()) as conn:
q = sql.select([self._tables.logbooks])
for row in conn.execute(q):
book = self._converter.convert_book(row)
self._converter.populate_book(conn, book)
gathered.append(book)
except sa_exc.DBAPIError as e:
LOG.exception('Failed getting logbooks')
raise exc.StorageFailure("Failed getting logbooks", e)
for lb in books:
yield lb
for book in gathered:
yield book
def close(self):
pass
###
# Internal <-> external model + merging + other helper functions.
###
def _atomdetails_merge(ad_m, ad):
atom_type = logbook.atom_detail_type(ad)
if atom_type != ad_m.atom_type:
raise exc.StorageFailure("Can not merge differing atom types "
"(%s != %s)" % (atom_type, ad_m.atom_type))
ad_d = ad.to_dict()
ad_m.state = ad_d['state']
ad_m.intention = ad_d['intention']
ad_m.results = ad_d['results']
ad_m.version = ad_d['version']
ad_m.failure = ad_d['failure']
ad_m.meta = ad_d['meta']
ad_m.name = ad_d['name']
return ad_m
def _flowdetails_merge(fd_m, fd):
fd_d = fd.to_dict()
fd_m.state = fd_d['state']
fd_m.name = fd_d['name']
fd_m.meta = fd_d['meta']
for ad in fd:
existing_ad = False
for ad_m in fd_m.atomdetails:
if ad_m.uuid == ad.uuid:
existing_ad = True
ad_m = _atomdetails_merge(ad_m, ad)
break
if not existing_ad:
ad_m = _convert_ad_to_internal(ad, fd_m.uuid)
fd_m.atomdetails.append(ad_m)
return fd_m
def _logbook_merge(lb_m, lb):
lb_d = lb.to_dict()
lb_m.meta = lb_d['meta']
lb_m.name = lb_d['name']
lb_m.created_at = lb_d['created_at']
lb_m.updated_at = lb_d['updated_at']
for fd in lb:
existing_fd = False
for fd_m in lb_m.flowdetails:
if fd_m.uuid == fd.uuid:
existing_fd = True
fd_m = _flowdetails_merge(fd_m, fd)
if not existing_fd:
lb_m.flowdetails.append(_convert_fd_to_internal(fd, lb_m.uuid))
return lb_m
def _convert_fd_to_external(fd):
fd_c = logbook.FlowDetail(fd.name, uuid=fd.uuid)
fd_c.meta = fd.meta
fd_c.state = fd.state
for ad_m in fd.atomdetails:
fd_c.add(_convert_ad_to_external(ad_m))
return fd_c
def _convert_fd_to_internal(fd, parent_uuid):
fd_m = models.FlowDetail(name=fd.name, uuid=fd.uuid,
parent_uuid=parent_uuid, meta=fd.meta,
state=fd.state)
fd_m.atomdetails = []
for ad in fd:
fd_m.atomdetails.append(_convert_ad_to_internal(ad, fd_m.uuid))
return fd_m
def _convert_ad_to_internal(ad, parent_uuid):
converted = ad.to_dict()
converted['atom_type'] = logbook.atom_detail_type(ad)
converted['parent_uuid'] = parent_uuid
return models.AtomDetail(**converted)
def _convert_ad_to_external(ad):
# Convert from sqlalchemy model -> external model, this allows us
# to change the internal sqlalchemy model easily by forcing a defined
# interface (that isn't the sqlalchemy model itself).
atom_cls = logbook.atom_detail_class(ad.atom_type)
return atom_cls.from_dict({
'state': ad.state,
'intention': ad.intention,
'results': ad.results,
'failure': ad.failure,
'meta': ad.meta,
'version': ad.version,
'name': ad.name,
'uuid': ad.uuid,
})
def _convert_lb_to_external(lb_m):
lb_c = logbook.LogBook(lb_m.name, lb_m.uuid)
lb_c.updated_at = lb_m.updated_at
lb_c.created_at = lb_m.created_at
lb_c.meta = lb_m.meta
for fd_m in lb_m.flowdetails:
lb_c.add(_convert_fd_to_external(fd_m))
return lb_c
def _convert_lb_to_internal(lb_c):
lb_m = models.LogBook(uuid=lb_c.uuid, meta=lb_c.meta, name=lb_c.name)
lb_m.flowdetails = []
for fd_c in lb_c:
lb_m.flowdetails.append(_convert_fd_to_internal(fd_c, lb_c.uuid))
return lb_m
def _logbook_get_model(lb_id, session):
entry = session.query(models.LogBook).filter_by(uuid=lb_id).first()
if entry is None:
raise exc.NotFound("No logbook found with id: %s" % lb_id)
return entry
def _flow_details_get_model(flow_id, session):
entry = session.query(models.FlowDetail).filter_by(uuid=flow_id).first()
if entry is None:
raise exc.NotFound("No flow details found with id: %s" % flow_id)
return entry
def _atom_details_get_model(atom_id, session):
entry = session.query(models.AtomDetail).filter_by(uuid=atom_id).first()
if entry is None:
raise exc.NotFound("No atom details found with id: %s" % atom_id)
return entry

View File

@@ -1,97 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
from sqlalchemy import Column, String, DateTime, Enum
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import ForeignKey
from sqlalchemy.orm import backref
from sqlalchemy.orm import relationship
from sqlalchemy import types as types
from taskflow.persistence import logbook
from taskflow import states
BASE = declarative_base()
# TODO(harlowja): remove when oslo.db exists
class TimestampMixin(object):
created_at = Column(DateTime, default=timeutils.utcnow)
updated_at = Column(DateTime, onupdate=timeutils.utcnow)
class Json(types.TypeDecorator):
impl = types.Text
def process_bind_param(self, value, dialect):
return jsonutils.dumps(value)
def process_result_value(self, value, dialect):
return jsonutils.loads(value)
class ModelBase(TimestampMixin):
"""Base model for all taskflow objects."""
uuid = Column(String, default=uuidutils.generate_uuid,
primary_key=True, nullable=False, unique=True)
name = Column(String, nullable=True)
meta = Column(Json, nullable=True)
class LogBook(BASE, ModelBase):
"""Represents a logbook for a set of flows."""
__tablename__ = 'logbooks'
# Relationships
flowdetails = relationship("FlowDetail",
single_parent=True,
backref=backref("logbooks",
cascade="save-update, delete, "
"merge"))
class FlowDetail(BASE, ModelBase):
__tablename__ = 'flowdetails'
# Member variables
state = Column(String)
# Relationships
parent_uuid = Column(String, ForeignKey('logbooks.uuid'))
atomdetails = relationship("AtomDetail",
single_parent=True,
backref=backref("flowdetails",
cascade="save-update, delete, "
"merge"))
class AtomDetail(BASE, ModelBase):
__tablename__ = 'atomdetails'
# Member variables
atom_type = Column(Enum(*logbook.ATOM_TYPES, name='atom_types'))
state = Column(String)
intention = Column(Enum(*states.INTENTIONS, name='intentions'))
results = Column(Json)
failure = Column(Json)
version = Column(Json)
# Relationships
parent_uuid = Column(String, ForeignKey('flowdetails.uuid'))

View File

@@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from oslo.serialization import jsonutils
from oslo.utils import timeutils
from oslo.utils import uuidutils
from sqlalchemy import Table, Column, String, ForeignKey, DateTime, Enum
from sqlalchemy import types
from taskflow.persistence import logbook
from taskflow import states
Tables = collections.namedtuple('Tables',
['logbooks', 'flowdetails', 'atomdetails'])
# Column length limits...
NAME_LENGTH = 255
UUID_LENGTH = 64
STATE_LENGTH = 255
VERSION_LENGTH = 64
class Json(types.TypeDecorator):
impl = types.Text
def process_bind_param(self, value, dialect):
if not value:
return None
return jsonutils.dumps(value)
def process_result_value(self, value, dialect):
if not value:
return None
return jsonutils.loads(value)
def fetch(metadata):
"""Returns the master set of table objects (which is also there schema)."""
logbooks = Table('logbooks', metadata,
Column('created_at', DateTime,
default=timeutils.utcnow),
Column('updated_at', DateTime,
default=timeutils.utcnow),
Column('meta', Json),
Column('name', String(length=NAME_LENGTH)),
Column('uuid', String(length=UUID_LENGTH),
primary_key=True, nullable=False, unique=True,
default=uuidutils.generate_uuid))
flowdetails = Table('flowdetails', metadata,
Column('created_at', DateTime,
default=timeutils.utcnow),
Column('updated_at', DateTime,
default=timeutils.utcnow),
Column('parent_uuid', String(length=UUID_LENGTH),
ForeignKey('logbooks.uuid',
ondelete='CASCADE')),
Column('meta', Json),
Column('name', String(length=NAME_LENGTH)),
Column('state', String(length=STATE_LENGTH)),
Column('uuid', String(length=UUID_LENGTH),
primary_key=True, nullable=False, unique=True,
default=uuidutils.generate_uuid))
atomdetails = Table('atomdetails', metadata,
Column('created_at', DateTime,
default=timeutils.utcnow),
Column('updated_at', DateTime,
default=timeutils.utcnow),
Column('meta', Json),
Column('parent_uuid', String(length=UUID_LENGTH),
ForeignKey('flowdetails.uuid',
ondelete='CASCADE')),
Column('name', String(length=NAME_LENGTH)),
Column('version', String(length=VERSION_LENGTH)),
Column('state', String(length=STATE_LENGTH)),
Column('uuid', String(length=UUID_LENGTH),
primary_key=True, nullable=False, unique=True,
default=uuidutils.generate_uuid),
Column('failure', Json),
Column('results', Json),
Column('atom_type', Enum(*logbook.ATOM_TYPES,
name='atom_types')),
Column('intention', Enum(*states.INTENTIONS,
name='intentions')))
return Tables(logbooks, flowdetails, atomdetails)