Merge "Use the SQLAlchemy ORM"
This commit is contained in:
commit
7ce90a8a7f
|
@ -19,9 +19,8 @@ import alembic.command
|
||||||
import alembic.config
|
import alembic.config
|
||||||
import sqlalchemy as sa
|
import sqlalchemy as sa
|
||||||
from sqlalchemy.ext.declarative import declarative_base
|
from sqlalchemy.ext.declarative import declarative_base
|
||||||
from sqlalchemy.orm import relationship
|
from sqlalchemy import orm
|
||||||
import sqlalchemy.pool
|
import sqlalchemy.pool
|
||||||
from sqlalchemy.sql import select
|
|
||||||
import voluptuous
|
import voluptuous
|
||||||
|
|
||||||
from zuul.connection import BaseConnection
|
from zuul.connection import BaseConnection
|
||||||
|
@ -30,6 +29,75 @@ BUILDSET_TABLE = 'zuul_buildset'
|
||||||
BUILD_TABLE = 'zuul_build'
|
BUILD_TABLE = 'zuul_build'
|
||||||
|
|
||||||
|
|
||||||
|
class DatabaseSession(object):
|
||||||
|
def __init__(self, connection):
|
||||||
|
self.connection = connection
|
||||||
|
self.session = connection.session
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, etype, value, tb):
|
||||||
|
if etype:
|
||||||
|
self.session().rollback()
|
||||||
|
else:
|
||||||
|
self.session().commit()
|
||||||
|
self.session().close()
|
||||||
|
self.session = None
|
||||||
|
|
||||||
|
def listFilter(self, query, column, value):
|
||||||
|
if value is None:
|
||||||
|
return query
|
||||||
|
if isinstance(value, list):
|
||||||
|
return query.filter(column.in_(value))
|
||||||
|
return query.filter(column == value)
|
||||||
|
|
||||||
|
def getBuilds(self, tenant=None, project=None, pipeline=None,
|
||||||
|
change=None, branch=None, patchset=None, ref=None,
|
||||||
|
newrev=None, uuid=None, job_name=None, voting=None,
|
||||||
|
node_name=None, result=None, limit=50, offset=0):
|
||||||
|
|
||||||
|
build_table = self.connection.zuul_build_table
|
||||||
|
buildset_table = self.connection.zuul_buildset_table
|
||||||
|
|
||||||
|
# contains_eager allows us to perform eager loading on the
|
||||||
|
# buildset *and* use that table in filters (unlike
|
||||||
|
# joinedload).
|
||||||
|
q = self.session().query(self.connection.buildModel).\
|
||||||
|
join(self.connection.buildSetModel).\
|
||||||
|
options(orm.contains_eager(self.connection.buildModel.buildset)).\
|
||||||
|
with_hint(build_table, 'USE INDEX (PRIMARY)', 'mysql')
|
||||||
|
|
||||||
|
q = self.listFilter(q, buildset_table.c.tenant, tenant)
|
||||||
|
q = self.listFilter(q, buildset_table.c.project, project)
|
||||||
|
q = self.listFilter(q, buildset_table.c.pipeline, pipeline)
|
||||||
|
q = self.listFilter(q, buildset_table.c.change, change)
|
||||||
|
q = self.listFilter(q, buildset_table.c.branch, branch)
|
||||||
|
q = self.listFilter(q, buildset_table.c.patchset, patchset)
|
||||||
|
q = self.listFilter(q, buildset_table.c.ref, ref)
|
||||||
|
q = self.listFilter(q, buildset_table.c.newrev, newrev)
|
||||||
|
q = self.listFilter(q, build_table.c.uuid, uuid)
|
||||||
|
q = self.listFilter(q, build_table.c.job_name, job_name)
|
||||||
|
q = self.listFilter(q, build_table.c.voting, voting)
|
||||||
|
q = self.listFilter(q, build_table.c.node_name, node_name)
|
||||||
|
q = self.listFilter(q, build_table.c.result, result)
|
||||||
|
|
||||||
|
q = q.order_by(build_table.c.id.desc()).\
|
||||||
|
limit(limit).\
|
||||||
|
offset(offset)
|
||||||
|
|
||||||
|
try:
|
||||||
|
return q.all()
|
||||||
|
except sqlalchemy.orm.exc.NoResultFound:
|
||||||
|
return []
|
||||||
|
|
||||||
|
def createBuildSet(self, *args, **kw):
|
||||||
|
bs = self.connection.buildSetModel(*args, **kw)
|
||||||
|
self.session().add(bs)
|
||||||
|
self.session().flush()
|
||||||
|
return bs
|
||||||
|
|
||||||
|
|
||||||
class SQLConnection(BaseConnection):
|
class SQLConnection(BaseConnection):
|
||||||
driver_name = 'sql'
|
driver_name = 'sql'
|
||||||
log = logging.getLogger("zuul.SQLConnection")
|
log = logging.getLogger("zuul.SQLConnection")
|
||||||
|
@ -58,6 +126,18 @@ class SQLConnection(BaseConnection):
|
||||||
poolclass=sqlalchemy.pool.QueuePool,
|
poolclass=sqlalchemy.pool.QueuePool,
|
||||||
pool_recycle=self.connection_config.get('pool_recycle', 1))
|
pool_recycle=self.connection_config.get('pool_recycle', 1))
|
||||||
self._migrate()
|
self._migrate()
|
||||||
|
|
||||||
|
# If we want the objects returned from query() to be
|
||||||
|
# usable outside of the session, we need to expunge them
|
||||||
|
# from the session, and since the DatabaseSession always
|
||||||
|
# calls commit() on the session when the context manager
|
||||||
|
# exits, we need to inform the session not to expire
|
||||||
|
# objects when it does so.
|
||||||
|
self.session_factory = orm.sessionmaker(bind=self.engine,
|
||||||
|
expire_on_commit=False,
|
||||||
|
autoflush=False)
|
||||||
|
self.session = orm.scoped_session(self.session_factory)
|
||||||
|
|
||||||
self.tables_established = True
|
self.tables_established = True
|
||||||
except sa.exc.NoSuchModuleError:
|
except sa.exc.NoSuchModuleError:
|
||||||
self.log.exception(
|
self.log.exception(
|
||||||
|
@ -68,6 +148,9 @@ class SQLConnection(BaseConnection):
|
||||||
"Unable to connect to the database or establish the required "
|
"Unable to connect to the database or establish the required "
|
||||||
"tables. Reporter %s is disabled" % self)
|
"tables. Reporter %s is disabled" % self)
|
||||||
|
|
||||||
|
def getSession(self):
|
||||||
|
return DatabaseSession(self)
|
||||||
|
|
||||||
def _migrate(self):
|
def _migrate(self):
|
||||||
"""Perform the alembic migrations for this connection"""
|
"""Perform the alembic migrations for this connection"""
|
||||||
with self.engine.begin() as conn:
|
with self.engine.begin() as conn:
|
||||||
|
@ -89,24 +172,9 @@ class SQLConnection(BaseConnection):
|
||||||
def _setup_models(self):
|
def _setup_models(self):
|
||||||
Base = declarative_base(metadata=sa.MetaData())
|
Base = declarative_base(metadata=sa.MetaData())
|
||||||
|
|
||||||
class BuildModel(Base):
|
|
||||||
__tablename__ = self.table_prefix + BUILD_TABLE
|
|
||||||
id = sa.Column(sa.Integer, primary_key=True)
|
|
||||||
buildset_id = sa.Column(sa.String, sa.ForeignKey(
|
|
||||||
self.table_prefix + BUILDSET_TABLE + ".id"))
|
|
||||||
uuid = sa.Column(sa.String(36))
|
|
||||||
job_name = sa.Column(sa.String(255))
|
|
||||||
result = sa.Column(sa.String(255))
|
|
||||||
start_time = sa.Column(sa.DateTime)
|
|
||||||
end_time = sa.Column(sa.DateTime)
|
|
||||||
voting = sa.Column(sa.Boolean)
|
|
||||||
log_url = sa.Column(sa.String(255))
|
|
||||||
node_name = sa.Column(sa.String(255))
|
|
||||||
|
|
||||||
class BuildSetModel(Base):
|
class BuildSetModel(Base):
|
||||||
__tablename__ = self.table_prefix + BUILDSET_TABLE
|
__tablename__ = self.table_prefix + BUILDSET_TABLE
|
||||||
id = sa.Column(sa.Integer, primary_key=True)
|
id = sa.Column(sa.Integer, primary_key=True)
|
||||||
builds = relationship(BuildModel, lazy="subquery")
|
|
||||||
zuul_ref = sa.Column(sa.String(255))
|
zuul_ref = sa.Column(sa.String(255))
|
||||||
pipeline = sa.Column(sa.String(255))
|
pipeline = sa.Column(sa.String(255))
|
||||||
project = sa.Column(sa.String(255))
|
project = sa.Column(sa.String(255))
|
||||||
|
@ -121,6 +189,30 @@ class SQLConnection(BaseConnection):
|
||||||
message = sa.Column(sa.TEXT())
|
message = sa.Column(sa.TEXT())
|
||||||
tenant = sa.Column(sa.String(255))
|
tenant = sa.Column(sa.String(255))
|
||||||
|
|
||||||
|
def createBuild(self, *args, **kw):
|
||||||
|
session = orm.session.Session.object_session(self)
|
||||||
|
b = BuildModel(*args, **kw)
|
||||||
|
b.buildset_id = self.id
|
||||||
|
self.builds.append(b)
|
||||||
|
session.add(b)
|
||||||
|
session.flush()
|
||||||
|
return b
|
||||||
|
|
||||||
|
class BuildModel(Base):
|
||||||
|
__tablename__ = self.table_prefix + BUILD_TABLE
|
||||||
|
id = sa.Column(sa.Integer, primary_key=True)
|
||||||
|
buildset_id = sa.Column(sa.String, sa.ForeignKey(
|
||||||
|
self.table_prefix + BUILDSET_TABLE + ".id"))
|
||||||
|
uuid = sa.Column(sa.String(36))
|
||||||
|
job_name = sa.Column(sa.String(255))
|
||||||
|
result = sa.Column(sa.String(255))
|
||||||
|
start_time = sa.Column(sa.DateTime)
|
||||||
|
end_time = sa.Column(sa.DateTime)
|
||||||
|
voting = sa.Column(sa.Boolean)
|
||||||
|
log_url = sa.Column(sa.String(255))
|
||||||
|
node_name = sa.Column(sa.String(255))
|
||||||
|
buildset = orm.relationship(BuildSetModel, backref="builds")
|
||||||
|
|
||||||
self.buildModel = BuildModel
|
self.buildModel = BuildModel
|
||||||
self.buildSetModel = BuildSetModel
|
self.buildSetModel = BuildSetModel
|
||||||
return self.buildSetModel.__table__, self.buildModel.__table__
|
return self.buildSetModel.__table__, self.buildModel.__table__
|
||||||
|
@ -129,58 +221,10 @@ class SQLConnection(BaseConnection):
|
||||||
self.log.debug("Stopping SQL connection %s" % self.connection_name)
|
self.log.debug("Stopping SQL connection %s" % self.connection_name)
|
||||||
self.engine.dispose()
|
self.engine.dispose()
|
||||||
|
|
||||||
def query(self, args):
|
def getBuilds(self, *args, **kw):
|
||||||
build = self.zuul_build_table
|
"""Return a list of Build objects"""
|
||||||
buildset = self.zuul_buildset_table
|
with self.getSession() as db:
|
||||||
query = select([
|
return db.getBuilds(*args, **kw)
|
||||||
buildset.c.project,
|
|
||||||
buildset.c.branch,
|
|
||||||
buildset.c.pipeline,
|
|
||||||
buildset.c.change,
|
|
||||||
buildset.c.patchset,
|
|
||||||
buildset.c.ref,
|
|
||||||
buildset.c.newrev,
|
|
||||||
buildset.c.ref_url,
|
|
||||||
build.c.result,
|
|
||||||
build.c.uuid,
|
|
||||||
build.c.job_name,
|
|
||||||
build.c.voting,
|
|
||||||
build.c.node_name,
|
|
||||||
build.c.start_time,
|
|
||||||
build.c.end_time,
|
|
||||||
build.c.log_url]).select_from(build.join(buildset))
|
|
||||||
for table in ('build', 'buildset'):
|
|
||||||
for key, val in args['%s_filters' % table].items():
|
|
||||||
if table == 'build':
|
|
||||||
column = build.c
|
|
||||||
else:
|
|
||||||
column = buildset.c
|
|
||||||
query = query.where(getattr(column, key).in_(val))
|
|
||||||
return query.\
|
|
||||||
limit(args['limit']).\
|
|
||||||
offset(args['skip']).\
|
|
||||||
order_by(build.c.id.desc()).\
|
|
||||||
with_hint(build, 'USE INDEX (PRIMARY)', 'mysql')
|
|
||||||
|
|
||||||
def get_builds(self, args):
|
|
||||||
"""Return a list of build"""
|
|
||||||
builds = []
|
|
||||||
with self.engine.begin() as conn:
|
|
||||||
for row in conn.execute(self.query(args)):
|
|
||||||
build = dict(row)
|
|
||||||
# Convert date to iso format
|
|
||||||
if row.start_time:
|
|
||||||
build['start_time'] = row.start_time.strftime(
|
|
||||||
'%Y-%m-%dT%H:%M:%S')
|
|
||||||
if row.end_time:
|
|
||||||
build['end_time'] = row.end_time.strftime(
|
|
||||||
'%Y-%m-%dT%H:%M:%S')
|
|
||||||
# Compute run duration
|
|
||||||
if row.start_time and row.end_time:
|
|
||||||
build['duration'] = (row.end_time -
|
|
||||||
row.start_time).total_seconds()
|
|
||||||
builds.append(build)
|
|
||||||
return builds
|
|
||||||
|
|
||||||
|
|
||||||
def getSchema():
|
def getSchema():
|
||||||
|
|
|
@ -32,42 +32,31 @@ class SQLReporter(BaseReporter):
|
||||||
self.log.warn("SQL reporter (%s) is disabled " % self)
|
self.log.warn("SQL reporter (%s) is disabled " % self)
|
||||||
return
|
return
|
||||||
|
|
||||||
with self.connection.engine.begin() as conn:
|
with self.connection.getSession() as db:
|
||||||
change = getattr(item.change, 'number', None)
|
db_buildset = db.createBuildSet(
|
||||||
patchset = getattr(item.change, 'patchset', None)
|
tenant=item.pipeline.tenant.name,
|
||||||
ref = getattr(item.change, 'ref', '')
|
|
||||||
oldrev = getattr(item.change, 'oldrev', '')
|
|
||||||
newrev = getattr(item.change, 'newrev', '')
|
|
||||||
branch = getattr(item.change, 'branch', '')
|
|
||||||
buildset_ins = self.connection.zuul_buildset_table.insert().values(
|
|
||||||
zuul_ref=item.current_build_set.ref,
|
|
||||||
pipeline=item.pipeline.name,
|
pipeline=item.pipeline.name,
|
||||||
project=item.change.project.name,
|
project=item.change.project.name,
|
||||||
change=change,
|
change=getattr(item.change, 'number', None),
|
||||||
patchset=patchset,
|
patchset=getattr(item.change, 'patchset', None),
|
||||||
ref=ref,
|
ref=getattr(item.change, 'ref', ''),
|
||||||
oldrev=oldrev,
|
oldrev=getattr(item.change, 'oldrev', ''),
|
||||||
newrev=newrev,
|
newrev=getattr(item.change, 'newrev', ''),
|
||||||
|
branch=getattr(item.change, 'branch', ''),
|
||||||
|
zuul_ref=item.current_build_set.ref,
|
||||||
ref_url=item.change.url,
|
ref_url=item.change.url,
|
||||||
result=item.current_build_set.result,
|
result=item.current_build_set.result,
|
||||||
message=self._formatItemReport(
|
message=self._formatItemReport(item, with_jobs=False),
|
||||||
item, with_jobs=False),
|
|
||||||
tenant=item.pipeline.tenant.name,
|
|
||||||
branch=branch,
|
|
||||||
)
|
)
|
||||||
buildset_ins_result = conn.execute(buildset_ins)
|
|
||||||
build_inserts = []
|
|
||||||
|
|
||||||
for job in item.getJobs():
|
for job in item.getJobs():
|
||||||
build = item.current_build_set.getBuild(job.name)
|
build = item.current_build_set.getBuild(job.name)
|
||||||
if not build:
|
if not build:
|
||||||
# build hasn't began. The sql reporter can only send back
|
# build hasn't begun. The sql reporter can only send back
|
||||||
# stats about builds. It doesn't understand how to store
|
# stats about builds. It doesn't understand how to store
|
||||||
# information about the change.
|
# information about the change.
|
||||||
continue
|
continue
|
||||||
|
|
||||||
(result, url) = item.formatJobResult(job)
|
(result, url) = item.formatJobResult(job)
|
||||||
|
|
||||||
start = end = None
|
start = end = None
|
||||||
if build.start_time:
|
if build.start_time:
|
||||||
start = datetime.datetime.fromtimestamp(
|
start = datetime.datetime.fromtimestamp(
|
||||||
|
@ -78,19 +67,16 @@ class SQLReporter(BaseReporter):
|
||||||
build.end_time,
|
build.end_time,
|
||||||
tz=datetime.timezone.utc)
|
tz=datetime.timezone.utc)
|
||||||
|
|
||||||
build_inserts.append({
|
db_buildset.createBuild(
|
||||||
'buildset_id': buildset_ins_result.inserted_primary_key[0],
|
uuid=build.uuid,
|
||||||
'uuid': build.uuid,
|
job_name=build.job.name,
|
||||||
'job_name': build.job.name,
|
result=result,
|
||||||
'result': result,
|
start_time=start,
|
||||||
'start_time': start,
|
end_time=end,
|
||||||
'end_time': end,
|
voting=build.job.voting,
|
||||||
'voting': build.job.voting,
|
log_url=url,
|
||||||
'log_url': url,
|
node_name=build.node_name,
|
||||||
'node_name': build.node_name,
|
)
|
||||||
})
|
|
||||||
conn.execute(self.connection.zuul_build_table.insert(),
|
|
||||||
build_inserts)
|
|
||||||
|
|
||||||
|
|
||||||
def getSchema():
|
def getSchema():
|
||||||
|
|
|
@ -374,6 +374,44 @@ class ZuulWebAPI(object):
|
||||||
resp.headers['Content-Type'] = 'text/plain'
|
resp.headers['Content-Type'] = 'text/plain'
|
||||||
return job.data[0] + '\n'
|
return job.data[0] + '\n'
|
||||||
|
|
||||||
|
def buildToDict(self, build):
|
||||||
|
start_time = build.start_time
|
||||||
|
if build.start_time:
|
||||||
|
start_time = start_time.strftime(
|
||||||
|
'%Y-%m-%dT%H:%M:%S')
|
||||||
|
end_time = build.end_time
|
||||||
|
if build.end_time:
|
||||||
|
end_time = end_time.strftime(
|
||||||
|
'%Y-%m-%dT%H:%M:%S')
|
||||||
|
if build.start_time and build.end_time:
|
||||||
|
duration = (build.end_time -
|
||||||
|
build.start_time).total_seconds()
|
||||||
|
else:
|
||||||
|
duration = None
|
||||||
|
|
||||||
|
buildset = build.buildset
|
||||||
|
ret = {
|
||||||
|
'uuid': build.uuid,
|
||||||
|
'job_name': build.job_name,
|
||||||
|
'result': build.result,
|
||||||
|
'start_time': start_time,
|
||||||
|
'end_time': end_time,
|
||||||
|
'duration': duration,
|
||||||
|
'voting': build.voting,
|
||||||
|
'log_url': build.log_url,
|
||||||
|
'node_name': build.node_name,
|
||||||
|
|
||||||
|
'project': buildset.project,
|
||||||
|
'branch': buildset.branch,
|
||||||
|
'pipeline': buildset.pipeline,
|
||||||
|
'change': buildset.change,
|
||||||
|
'patchset': buildset.patchset,
|
||||||
|
'ref': buildset.ref,
|
||||||
|
'newrev': buildset.newrev,
|
||||||
|
'ref_url': buildset.ref_url,
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
@cherrypy.tools.save_params()
|
@cherrypy.tools.save_params()
|
||||||
@cherrypy.tools.json_out(content_type='application/json; charset=utf-8')
|
@cherrypy.tools.json_out(content_type='application/json; charset=utf-8')
|
||||||
|
@ -391,31 +429,15 @@ class ZuulWebAPI(object):
|
||||||
|
|
||||||
connection = self.zuulweb.connections.connections[connection_name]
|
connection = self.zuulweb.connections.connections[connection_name]
|
||||||
|
|
||||||
args = {
|
builds = connection.getBuilds(
|
||||||
'buildset_filters': {'tenant': [tenant]},
|
tenant=tenant, project=project, pipeline=pipeline, change=change,
|
||||||
'build_filters': {},
|
branch=branch, patchset=patchset, ref=ref, newrev=newrev,
|
||||||
'limit': limit,
|
uuid=uuid, job_name=job_name, voting=voting, node_name=node_name,
|
||||||
'skip': skip,
|
result=result, limit=limit, offset=skip)
|
||||||
}
|
|
||||||
|
|
||||||
for k in ("project", "pipeline", "change", "branch",
|
|
||||||
"patchset", "ref", "newrev"):
|
|
||||||
v = locals()[k]
|
|
||||||
if v:
|
|
||||||
if not isinstance(v, list):
|
|
||||||
v = [v]
|
|
||||||
args['buildset_filters'].setdefault(k, []).extend(v)
|
|
||||||
for k in ("uuid", "job_name", "voting", "node_name",
|
|
||||||
"result"):
|
|
||||||
v = locals()[k]
|
|
||||||
if v:
|
|
||||||
if not isinstance(v, list):
|
|
||||||
v = [v]
|
|
||||||
args['build_filters'].setdefault(k, []).extend(v)
|
|
||||||
data = connection.get_builds(args)
|
|
||||||
resp = cherrypy.response
|
resp = cherrypy.response
|
||||||
resp.headers['Access-Control-Allow-Origin'] = '*'
|
resp.headers['Access-Control-Allow-Origin'] = '*'
|
||||||
return data
|
return [self.buildToDict(b) for b in builds]
|
||||||
|
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
@cherrypy.tools.save_params()
|
@cherrypy.tools.save_params()
|
||||||
|
@ -431,16 +453,10 @@ class ZuulWebAPI(object):
|
||||||
|
|
||||||
connection = self.zuulweb.connections.connections[connection_name]
|
connection = self.zuulweb.connections.connections[connection_name]
|
||||||
|
|
||||||
args = {
|
data = connection.getBuilds(tenant=tenant, uuid=uuid, limit=1)
|
||||||
'buildset_filters': {'tenant': [tenant]},
|
|
||||||
'build_filters': {'uuid': [uuid]},
|
|
||||||
'limit': 1,
|
|
||||||
'skip': 0,
|
|
||||||
}
|
|
||||||
data = connection.get_builds(args)
|
|
||||||
if not data:
|
if not data:
|
||||||
raise cherrypy.HTTPError(404, "Build not found")
|
raise cherrypy.HTTPError(404, "Build not found")
|
||||||
data = data[0]
|
data = self.buildToDict(data[0])
|
||||||
resp = cherrypy.response
|
resp = cherrypy.response
|
||||||
resp.headers['Access-Control-Allow-Origin'] = '*'
|
resp.headers['Access-Control-Allow-Origin'] = '*'
|
||||||
return data
|
return data
|
||||||
|
|
Loading…
Reference in New Issue