Refactor bundle in sql connection
This refactors the sql connection to accomodate multiple simulataneous changes in a buildset. The change information is removed from the buildset table and placed in a ref table. Buildsets are associated with refs many-to-many via the zuul_buildset_ref table. Builds are also associated with refs, many-to-one, so that we can support multiple builds with the same job name in a buildset, but we still know which change they are for. In order to maintain a unique index in the new zuul_ref table (so that we only have one entry for a given ref-like object (change, branch, tag, ref)) we need to shorten the sha fields to 40 characters (to accomodate mysql's index size limit) and also avoid nulls (to accomodate postgres's inability to use null-safe comparison operators on indexes). So that we can continue to use change=None, patchset=None, etc, values in Python, we add a sqlalchemy TypeDectorator to coerce None to and from null-safe values such as 0 or the empty string. Some previous schema migration tests inserted data with null projects, which should never have actually happened, so these tests are updated to be more realistic since the new data migration requires non-null project fields. The migration itself has been tested with a data set consisting of about 3 million buildsets with 22 million builds. The runtime on one ssd-based test system in mysql is about 22 minutes and in postgres about 8 minutes. Change-Id: I21f3f3dfc8f93a23744856e5b82b3c948c118dc2
This commit is contained in:
28
releasenotes/notes/bundle-refactor-sql-11c892ee29feb7e7.yaml
Normal file
28
releasenotes/notes/bundle-refactor-sql-11c892ee29feb7e7.yaml
Normal file
@@ -0,0 +1,28 @@
|
||||
---
|
||||
upgrade:
|
||||
- |
|
||||
A significant SQL schema migration is included which requires
|
||||
extra care when upgrading.
|
||||
|
||||
It is recommended to make a database backup before upgrading in
|
||||
case of problems. It is also recommended to test the migration
|
||||
with a copy of the database in a development environment first in
|
||||
order to ascertain expected runtime and detect any data integrity
|
||||
problems early.
|
||||
|
||||
Zuul is unable to operate as normal during the schema upgrade.
|
||||
The following procedure is recommended:
|
||||
|
||||
* Perform a database backup
|
||||
* Stop all Zuul components
|
||||
* Start one scheduler and wait for it to complete the migration
|
||||
and initialization
|
||||
* Start the rest of Zuul
|
||||
|
||||
If the migration fails and the backing database is PostgreSQL, the
|
||||
migration will be rolled back and Zuul may be restarted on the
|
||||
previous version. If the backing database is MySQL, if the error
|
||||
happens early enough the migration may be rolled back (look for
|
||||
"Early error in schema migration, rolling back" in scheduler
|
||||
logs). If an error happens late in the migration, manual
|
||||
intervention may be required.
|
||||
@@ -870,7 +870,7 @@ class TestGerritCircularDependencies(ZuulTestCase):
|
||||
self.assertEqual(A.data["status"], "NEW")
|
||||
self.assertEqual(B.data["status"], "NEW")
|
||||
|
||||
buildsets = {bs.change: bs for bs in
|
||||
buildsets = {bs.refs[0].change: bs for bs in
|
||||
self.scheds.first.connections.connections[
|
||||
'database'].getBuildsets()}
|
||||
self.assertEqual(buildsets[2].result, 'MERGE_FAILURE')
|
||||
|
||||
@@ -519,15 +519,6 @@ class DBPruneTestCase(ZuulTestCase):
|
||||
uuid=buildset_uuid,
|
||||
tenant='tenant-one',
|
||||
pipeline='check',
|
||||
project='org/project',
|
||||
change='1',
|
||||
patchset='1',
|
||||
ref='refs/changes/1',
|
||||
oldrev='',
|
||||
newrev='',
|
||||
branch='master',
|
||||
zuul_ref='Zref',
|
||||
ref_url='http://gerrit.example.com/1',
|
||||
event_id=event_id,
|
||||
event_timestamp=update_time,
|
||||
updated=update_time,
|
||||
@@ -535,9 +526,22 @@ class DBPruneTestCase(ZuulTestCase):
|
||||
last_build_end_time=end_time,
|
||||
result='SUCCESS',
|
||||
)
|
||||
db_ref = db.getOrCreateRef(
|
||||
project='org/project',
|
||||
ref='refs/changes/1',
|
||||
change=1,
|
||||
patchset='1',
|
||||
oldrev='',
|
||||
newrev='',
|
||||
branch='master',
|
||||
ref_url='http://gerrit.example.com/1',
|
||||
)
|
||||
db_buildset.refs.append(db_ref)
|
||||
|
||||
for build_num in range(2):
|
||||
build_uuid = uuid.uuid4().hex
|
||||
db_build = db_buildset.createBuild(
|
||||
ref=db_ref,
|
||||
uuid=build_uuid,
|
||||
job_name=f'job{build_num}',
|
||||
start_time=start_time,
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
# Copyright 2014 Rackspace Australia
|
||||
# Copyright 2023 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@@ -70,11 +71,21 @@ class TestSQLConnectionMysql(ZuulTestCase):
|
||||
table_prefix = connection.table_prefix
|
||||
self.assertEqual(self.expected_table_prefix, table_prefix)
|
||||
|
||||
ref_table = table_prefix + 'zuul_ref'
|
||||
buildset_table = table_prefix + 'zuul_buildset'
|
||||
buildset_ref_table = table_prefix + 'zuul_buildset_ref'
|
||||
build_table = table_prefix + 'zuul_build'
|
||||
artifact_table = table_prefix + 'zuul_artifact'
|
||||
provides_table = table_prefix + 'zuul_provides'
|
||||
build_event_table = table_prefix + 'zuul_build_event'
|
||||
|
||||
self.assertEqual(20, len(insp.get_columns(buildset_table)))
|
||||
self.assertEqual(13, len(insp.get_columns(build_table)))
|
||||
self.assertEqual(9, len(insp.get_columns(ref_table)))
|
||||
self.assertEqual(11, len(insp.get_columns(buildset_table)))
|
||||
self.assertEqual(2, len(insp.get_columns(buildset_ref_table)))
|
||||
self.assertEqual(14, len(insp.get_columns(build_table)))
|
||||
self.assertEqual(5, len(insp.get_columns(artifact_table)))
|
||||
self.assertEqual(3, len(insp.get_columns(provides_table)))
|
||||
self.assertEqual(5, len(insp.get_columns(build_event_table)))
|
||||
|
||||
def test_sql_tables_created(self):
|
||||
"Test the tables for storing results are created properly"
|
||||
@@ -87,18 +98,35 @@ class TestSQLConnectionMysql(ZuulTestCase):
|
||||
table_prefix = connection.table_prefix
|
||||
self.assertEqual(self.expected_table_prefix, table_prefix)
|
||||
|
||||
ref_table = table_prefix + 'zuul_ref'
|
||||
buildset_table = table_prefix + 'zuul_buildset'
|
||||
buildset_ref_table = table_prefix + 'zuul_buildset_ref'
|
||||
build_table = table_prefix + 'zuul_build'
|
||||
artifact_table = table_prefix + 'zuul_artifact'
|
||||
provides_table = table_prefix + 'zuul_provides'
|
||||
build_event_table = table_prefix + 'zuul_build_event'
|
||||
|
||||
indexes_ref = insp.get_indexes(ref_table)
|
||||
indexes_buildset = insp.get_indexes(buildset_table)
|
||||
indexes_buildset_ref = insp.get_indexes(buildset_ref_table)
|
||||
indexes_build = insp.get_indexes(build_table)
|
||||
indexes_artifact = insp.get_indexes(artifact_table)
|
||||
indexes_provides = insp.get_indexes(provides_table)
|
||||
indexes_build_event = insp.get_indexes(build_event_table)
|
||||
|
||||
self.assertEqual(4, len(indexes_buildset))
|
||||
self.assertEqual(3, len(indexes_build))
|
||||
self.assertEqual(3, len(indexes_ref))
|
||||
self.assertEqual(1, len(indexes_buildset))
|
||||
self.assertEqual(2, len(indexes_buildset_ref))
|
||||
self.assertEqual(4, len(indexes_build))
|
||||
self.assertEqual(1, len(indexes_artifact))
|
||||
self.assertEqual(1, len(indexes_provides))
|
||||
self.assertEqual(1, len(indexes_build_event))
|
||||
|
||||
# check if all indexes are prefixed
|
||||
if table_prefix:
|
||||
indexes = indexes_buildset + indexes_build
|
||||
indexes = (indexes_ref + indexes_buildset + indexes_buildset_ref +
|
||||
indexes_build + indexes_artifact + indexes_provides +
|
||||
indexes_build_event)
|
||||
for index in indexes:
|
||||
self.assertTrue(index['name'].startswith(table_prefix))
|
||||
|
||||
@@ -111,127 +139,63 @@ class TestSQLConnectionMysql(ZuulTestCase):
|
||||
|
||||
def check_results():
|
||||
# Grab the sa tables
|
||||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||
pipeline = tenant.layout.pipelines['check']
|
||||
reporter = self.scheds.first.connections.getSqlReporter(
|
||||
pipeline)
|
||||
with self.scheds.first.connections.getSqlConnection().\
|
||||
engine.connect() as conn:
|
||||
|
||||
result = conn.execute(
|
||||
sa.sql.select(reporter.connection.zuul_buildset_table))
|
||||
|
||||
buildsets = result.fetchall()
|
||||
connection = self.scheds.first.connections.getSqlConnection()
|
||||
with connection.getSession() as db:
|
||||
buildsets = list(reversed(db.getBuildsets()))
|
||||
self.assertEqual(5, len(buildsets))
|
||||
buildset0 = buildsets[0]
|
||||
buildset1 = buildsets[1]
|
||||
buildset2 = buildsets[2]
|
||||
buildset3 = buildsets[3]
|
||||
buildset4 = buildsets[4]
|
||||
|
||||
self.assertEqual('check', buildset0.pipeline)
|
||||
self.assertEqual('org/project', buildset0.project)
|
||||
self.assertEqual(1, buildset0.change)
|
||||
self.assertEqual('1', buildset0.patchset)
|
||||
self.assertEqual('SUCCESS', buildset0.result)
|
||||
self.assertEqual('Build succeeded.', buildset0.message)
|
||||
self.assertEqual('tenant-one', buildset0.tenant)
|
||||
self.assertEqual('check', buildsets[0].pipeline)
|
||||
self.assertEqual('org/project', buildsets[0].refs[0].project)
|
||||
self.assertEqual(1, buildsets[0].refs[0].change)
|
||||
self.assertEqual('1', buildsets[0].refs[0].patchset)
|
||||
self.assertEqual('SUCCESS', buildsets[0].result)
|
||||
self.assertEqual('Build succeeded.', buildsets[0].message)
|
||||
self.assertEqual('tenant-one', buildsets[0].tenant)
|
||||
self.assertEqual(
|
||||
'https://review.example.com/%d' % buildset0.change,
|
||||
buildset0.ref_url)
|
||||
self.assertNotEqual(None, buildset0.event_id)
|
||||
self.assertNotEqual(None, buildset0.event_timestamp)
|
||||
|
||||
buildset0_builds = conn.execute(
|
||||
sa.sql.select(
|
||||
reporter.connection.zuul_build_table
|
||||
).where(
|
||||
reporter.connection.zuul_build_table.c.buildset_id ==
|
||||
buildset0.id
|
||||
)
|
||||
).fetchall()
|
||||
'https://review.example.com/%d' %
|
||||
buildsets[0].refs[0].change,
|
||||
buildsets[0].refs[0].ref_url)
|
||||
self.assertNotEqual(None, buildsets[0].event_id)
|
||||
self.assertNotEqual(None, buildsets[0].event_timestamp)
|
||||
|
||||
# Check the first result, which should be the project-merge job
|
||||
self.assertEqual(
|
||||
'project-merge', buildset0_builds[0].job_name)
|
||||
self.assertEqual("SUCCESS", buildset0_builds[0].result)
|
||||
self.assertEqual(None, buildset0_builds[0].log_url)
|
||||
self.assertEqual('check', buildset1.pipeline)
|
||||
self.assertEqual('master', buildset1.branch)
|
||||
self.assertEqual('org/project', buildset1.project)
|
||||
self.assertEqual(2, buildset1.change)
|
||||
self.assertEqual('1', buildset1.patchset)
|
||||
self.assertEqual('FAILURE', buildset1.result)
|
||||
self.assertEqual('Build failed.', buildset1.message)
|
||||
|
||||
buildset1_builds = conn.execute(
|
||||
sa.sql.select(
|
||||
reporter.connection.zuul_build_table
|
||||
).where(
|
||||
reporter.connection.zuul_build_table.c.buildset_id ==
|
||||
buildset1.id
|
||||
)
|
||||
).fetchall()
|
||||
'project-merge', buildsets[0].builds[0].job_name)
|
||||
self.assertEqual("SUCCESS", buildsets[0].builds[0].result)
|
||||
self.assertEqual(None, buildsets[0].builds[0].log_url)
|
||||
self.assertEqual('check', buildsets[1].pipeline)
|
||||
self.assertEqual('master', buildsets[1].refs[0].branch)
|
||||
self.assertEqual('org/project', buildsets[1].refs[0].project)
|
||||
self.assertEqual(2, buildsets[1].refs[0].change)
|
||||
self.assertEqual('1', buildsets[1].refs[0].patchset)
|
||||
self.assertEqual('FAILURE', buildsets[1].result)
|
||||
self.assertEqual('Build failed.', buildsets[1].message)
|
||||
|
||||
# Check the second result, which should be the project-test1
|
||||
# job which failed
|
||||
self.assertEqual(
|
||||
'project-test1', buildset1_builds[1].job_name)
|
||||
self.assertEqual("FAILURE", buildset1_builds[1].result)
|
||||
self.assertEqual(None, buildset1_builds[1].log_url)
|
||||
|
||||
buildset2_builds = conn.execute(
|
||||
sa.sql.select(
|
||||
reporter.connection.zuul_build_table
|
||||
).where(
|
||||
reporter.connection.zuul_build_table.c.buildset_id ==
|
||||
buildset2.id
|
||||
)
|
||||
).fetchall()
|
||||
'project-test1', buildsets[1].builds[1].job_name)
|
||||
self.assertEqual("FAILURE", buildsets[1].builds[1].result)
|
||||
self.assertEqual(None, buildsets[1].builds[1].log_url)
|
||||
|
||||
# Check the first result, which should be the project-publish
|
||||
# job
|
||||
self.assertEqual('project-publish',
|
||||
buildset2_builds[0].job_name)
|
||||
self.assertEqual("SUCCESS", buildset2_builds[0].result)
|
||||
|
||||
buildset3_builds = conn.execute(
|
||||
sa.sql.select(
|
||||
reporter.connection.zuul_build_table
|
||||
).where(
|
||||
reporter.connection.zuul_build_table.c.buildset_id ==
|
||||
buildset3.id
|
||||
)
|
||||
).fetchall()
|
||||
buildsets[2].builds[0].job_name)
|
||||
self.assertEqual("SUCCESS", buildsets[2].builds[0].result)
|
||||
|
||||
self.assertEqual(
|
||||
'project-test1', buildset3_builds[1].job_name)
|
||||
self.assertEqual('NODE_FAILURE', buildset3_builds[1].result)
|
||||
self.assertEqual(None, buildset3_builds[1].log_url)
|
||||
self.assertIsNotNone(buildset3_builds[1].start_time)
|
||||
self.assertIsNotNone(buildset3_builds[1].end_time)
|
||||
'project-test1', buildsets[3].builds[1].job_name)
|
||||
self.assertEqual('NODE_FAILURE', buildsets[3].builds[1].result)
|
||||
self.assertEqual(None, buildsets[3].builds[1].log_url)
|
||||
self.assertIsNotNone(buildsets[3].builds[1].start_time)
|
||||
self.assertIsNotNone(buildsets[3].builds[1].end_time)
|
||||
self.assertGreaterEqual(
|
||||
buildset3_builds[1].end_time,
|
||||
buildset3_builds[1].start_time)
|
||||
buildsets[3].builds[1].end_time,
|
||||
buildsets[3].builds[1].start_time)
|
||||
|
||||
# Check the paused build result
|
||||
buildset4_builds = conn.execute(
|
||||
sa.sql.select(
|
||||
reporter.connection.zuul_build_table
|
||||
).where(
|
||||
reporter.connection.zuul_build_table.c.buildset_id ==
|
||||
buildset4.id
|
||||
).order_by(reporter.connection.zuul_build_table.c.id)
|
||||
).fetchall()
|
||||
|
||||
paused_build_events = conn.execute(
|
||||
sa.sql.select(
|
||||
reporter.connection.zuul_build_event_table
|
||||
).where(
|
||||
reporter.connection.zuul_build_event_table.c.build_id
|
||||
== buildset4_builds[0].id
|
||||
)
|
||||
).fetchall()
|
||||
paused_build_events = buildsets[4].builds[0].build_events
|
||||
|
||||
self.assertEqual(len(paused_build_events), 2)
|
||||
pause_event = paused_build_events[0]
|
||||
@@ -316,60 +280,42 @@ class TestSQLConnectionMysql(ZuulTestCase):
|
||||
# Check the results
|
||||
def check_results():
|
||||
# Grab the sa tables
|
||||
tenant = self.scheds.first.sched.abide.tenants.get("tenant-one")
|
||||
pipeline = tenant.layout.pipelines['check']
|
||||
reporter = self.scheds.first.connections.getSqlReporter(
|
||||
pipeline)
|
||||
connection = self.scheds.first.connections.getSqlConnection()
|
||||
with connection.getSession() as db:
|
||||
buildsets = db.getBuildsets()
|
||||
|
||||
with self.scheds.first.connections.getSqlConnection().\
|
||||
engine.connect() as conn:
|
||||
|
||||
result = conn.execute(
|
||||
sa.sql.select(reporter.connection.zuul_buildset_table)
|
||||
)
|
||||
|
||||
buildsets = result.fetchall()
|
||||
self.assertEqual(1, len(buildsets))
|
||||
buildset0 = buildsets[0]
|
||||
|
||||
self.assertEqual('check', buildset0.pipeline)
|
||||
self.assertEqual('org/project', buildset0.project)
|
||||
self.assertEqual(1, buildset0.change)
|
||||
self.assertEqual('1', buildset0.patchset)
|
||||
self.assertEqual('org/project', buildset0.refs[0].project)
|
||||
self.assertEqual(1, buildset0.refs[0].change)
|
||||
self.assertEqual('1', buildset0.refs[0].patchset)
|
||||
self.assertEqual('SUCCESS', buildset0.result)
|
||||
self.assertEqual('Build succeeded.', buildset0.message)
|
||||
self.assertEqual('tenant-one', buildset0.tenant)
|
||||
self.assertEqual(
|
||||
'https://review.example.com/%d' % buildset0.change,
|
||||
buildset0.ref_url)
|
||||
'https://review.example.com/%d' % buildset0.refs[0].change,
|
||||
buildset0.refs[0].ref_url)
|
||||
|
||||
buildset0_builds = conn.execute(
|
||||
sa.sql.select(
|
||||
reporter.connection.zuul_build_table
|
||||
).where(
|
||||
reporter.connection.zuul_build_table.c.buildset_id ==
|
||||
buildset0.id
|
||||
)
|
||||
).fetchall()
|
||||
# Check the retry results
|
||||
self.assertEqual('project-merge', buildset0.builds[0].job_name)
|
||||
self.assertEqual('SUCCESS', buildset0.builds[0].result)
|
||||
self.assertTrue(buildset0.builds[0].final)
|
||||
|
||||
# Check the retry results
|
||||
self.assertEqual('project-merge', buildset0_builds[0].job_name)
|
||||
self.assertEqual('SUCCESS', buildset0_builds[0].result)
|
||||
self.assertTrue(buildset0_builds[0].final)
|
||||
self.assertEqual('project-test1', buildset0.builds[1].job_name)
|
||||
self.assertEqual('RETRY', buildset0.builds[1].result)
|
||||
self.assertFalse(buildset0.builds[1].final)
|
||||
self.assertEqual('project-test2', buildset0.builds[2].job_name)
|
||||
self.assertEqual('RETRY', buildset0.builds[2].result)
|
||||
self.assertFalse(buildset0.builds[2].final)
|
||||
|
||||
self.assertEqual('project-test1', buildset0_builds[1].job_name)
|
||||
self.assertEqual('RETRY', buildset0_builds[1].result)
|
||||
self.assertFalse(buildset0_builds[1].final)
|
||||
self.assertEqual('project-test2', buildset0_builds[2].job_name)
|
||||
self.assertEqual('RETRY', buildset0_builds[2].result)
|
||||
self.assertFalse(buildset0_builds[2].final)
|
||||
|
||||
self.assertEqual('project-test1', buildset0_builds[3].job_name)
|
||||
self.assertEqual('SUCCESS', buildset0_builds[3].result)
|
||||
self.assertTrue(buildset0_builds[3].final)
|
||||
self.assertEqual('project-test2', buildset0_builds[4].job_name)
|
||||
self.assertEqual('SUCCESS', buildset0_builds[4].result)
|
||||
self.assertTrue(buildset0_builds[4].final)
|
||||
self.assertEqual('project-test1', buildset0.builds[3].job_name)
|
||||
self.assertEqual('SUCCESS', buildset0.builds[3].result)
|
||||
self.assertTrue(buildset0.builds[3].final)
|
||||
self.assertEqual('project-test2', buildset0.builds[4].job_name)
|
||||
self.assertEqual('SUCCESS', buildset0.builds[4].result)
|
||||
self.assertTrue(buildset0.builds[4].final)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
@@ -402,6 +348,9 @@ class TestSQLConnectionMysql(ZuulTestCase):
|
||||
with self.scheds.first.connections.getSqlConnection().\
|
||||
engine.connect() as conn:
|
||||
|
||||
# We don't actually need to delete the zuul_ref entry
|
||||
result = conn.execute(sa.text(
|
||||
f"delete from {self.expected_table_prefix}zuul_buildset_ref;"))
|
||||
result = conn.execute(sa.text(
|
||||
f"delete from {self.expected_table_prefix}zuul_build;"))
|
||||
result = conn.execute(sa.text(
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# Copyright 2021 Acme Gating, LLC
|
||||
# Copyright 2021-2023 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@@ -12,6 +12,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import difflib
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
@@ -22,6 +23,9 @@ from tests.base import (
|
||||
BaseTestCase, MySQLSchemaFixture, PostgresqlSchemaFixture
|
||||
)
|
||||
|
||||
import sqlalchemy
|
||||
import testtools
|
||||
|
||||
|
||||
class DBBaseTestCase(BaseTestCase):
|
||||
def setUp(self):
|
||||
@@ -111,9 +115,11 @@ class TestMysqlDatabase(DBBaseTestCase):
|
||||
self.connection._migrate('c57e9e76b812')
|
||||
with self.connection.engine.begin() as connection:
|
||||
connection.exec_driver_sql(
|
||||
"insert into zuul_buildset (result) values ('SUCCESS')")
|
||||
"insert into zuul_buildset (project, result) "
|
||||
"values ('org/project', 'SUCCESS')")
|
||||
connection.exec_driver_sql(
|
||||
"insert into zuul_buildset (result) values ('MERGER_FAILURE')")
|
||||
"insert into zuul_buildset (project, result) "
|
||||
"values ('org/project', 'MERGER_FAILURE')")
|
||||
results = [r[0] for r in connection.exec_driver_sql(
|
||||
"select result from zuul_buildset")]
|
||||
self.assertEqual(results, ['SUCCESS', 'MERGER_FAILURE'])
|
||||
@@ -136,22 +142,26 @@ class TestMysqlDatabase(DBBaseTestCase):
|
||||
self.connection._migrate('4647def24b32')
|
||||
with self.connection.engine.begin() as connection:
|
||||
connection.exec_driver_sql(
|
||||
"insert into zuul_buildset (result) values ('SUCCESS')")
|
||||
"insert into zuul_buildset (project, result) "
|
||||
"values ('org/project', 'SUCCESS')")
|
||||
connection.exec_driver_sql(
|
||||
"insert into zuul_buildset (result, first_build_start_time) "
|
||||
"values ('SUCCESS', '2022-05-01 12:34:56')")
|
||||
"insert into zuul_buildset "
|
||||
"(project, result, first_build_start_time) "
|
||||
"values ('org/project', 'SUCCESS', '2022-05-01 12:34:56')")
|
||||
connection.exec_driver_sql(
|
||||
"insert into zuul_buildset (result, last_build_end_time) "
|
||||
"values ('SUCCESS', '2022-05-02 12:34:56')")
|
||||
"insert into zuul_buildset "
|
||||
"(project, result, last_build_end_time) "
|
||||
"values ('org/project', 'SUCCESS', '2022-05-02 12:34:56')")
|
||||
connection.exec_driver_sql(
|
||||
"insert into zuul_buildset (result, event_timestamp) "
|
||||
"values ('SUCCESS', '2022-05-03 12:34:56')")
|
||||
"insert into zuul_buildset "
|
||||
"(project, result, event_timestamp) "
|
||||
"values ('org/project', 'SUCCESS', '2022-05-03 12:34:56')")
|
||||
connection.exec_driver_sql(
|
||||
"insert into zuul_buildset (result, "
|
||||
"insert into zuul_buildset (project, result, "
|
||||
"first_build_start_time, "
|
||||
"last_build_end_time, "
|
||||
"event_timestamp)"
|
||||
"values ('SUCCESS', "
|
||||
"values ('org/project', 'SUCCESS', "
|
||||
"'2022-05-11 12:34:56', "
|
||||
"'2022-05-12 12:34:56', "
|
||||
"'2022-05-13 12:34:56')")
|
||||
@@ -167,29 +177,165 @@ class TestMysqlDatabase(DBBaseTestCase):
|
||||
'2022-05-03 12:34:56',
|
||||
'2022-05-13 12:34:56'])
|
||||
|
||||
def test_migration_f7843ddf1552(self):
|
||||
with self.connection.engine.begin() as connection:
|
||||
connection.exec_driver_sql("set foreign_key_checks=0")
|
||||
for table in connection.exec_driver_sql("show tables"):
|
||||
table = table[0]
|
||||
connection.exec_driver_sql(f"drop table {table}")
|
||||
connection.exec_driver_sql("set foreign_key_checks=1")
|
||||
|
||||
self.connection.force_migrations = True
|
||||
self.connection._migrate('151893067f91')
|
||||
with self.connection.engine.begin() as connection:
|
||||
connection.exec_driver_sql("""
|
||||
insert into zuul_buildset
|
||||
(id, uuid, zuul_ref, project, `change`, patchset, ref,
|
||||
ref_url, oldrev, newrev, branch)
|
||||
values
|
||||
(1, "bsuuid1", "Z1", "project1",
|
||||
1, "1",
|
||||
"refs/changes/1",
|
||||
"http://project1/1",
|
||||
"2d48fe5afe0bc7785294b28e9e96a6c622945b9d",
|
||||
"8a938aff20d691d1b9a9f461c6375f0c45acd305",
|
||||
"master"),
|
||||
(2, "bsuuid2", "Z2", "project1",
|
||||
2, "ee743613ce5b3aee11d12e91e932d7876bc0b40c",
|
||||
"refs/changes/2",
|
||||
"http://project1/2",
|
||||
"bd11c4ff79245ae555418e19840ce4752e48ea38",
|
||||
"d1ffc204e4b4955f2a47e7e37618fcf13bb1b9fd",
|
||||
"stable"),
|
||||
(3, "bsuuid3", "Z3", "project2", NULL, NULL, "refs/tags/foo",
|
||||
"http://project3",
|
||||
"a6cf0e07e9e2964a810196f764fc4ac766742568",
|
||||
"1a226acbd022968914574fdeb467b78bc5bfcc77",
|
||||
NULL)
|
||||
""")
|
||||
connection.exec_driver_sql("""
|
||||
insert into zuul_build
|
||||
(id, buildset_id, uuid, job_name, result)
|
||||
values
|
||||
(1, 1, "builduuid1", "job1", "RESULT1"),
|
||||
(2, 1, "builduuid2", "job2", "RESULT2"),
|
||||
(3, 2, "builduuid3", "job1", "RESULT3"),
|
||||
(4, 2, "builduuid4", "job2", "RESULT4"),
|
||||
(5, 3, "builduuid5", "job3", "RESULT5"),
|
||||
(6, 3, "builduuid6", "job4", "RESULT6")
|
||||
""")
|
||||
|
||||
self.connection._migrate()
|
||||
with self.connection.engine.begin() as connection:
|
||||
results = [r for r in connection.exec_driver_sql(
|
||||
"select b.id, r.ref from zuul_build b join zuul_ref r "
|
||||
"on b.ref_id=r.id order by b.id")]
|
||||
self.assertEqual(results, [
|
||||
(1, 'refs/changes/1'),
|
||||
(2, 'refs/changes/1'),
|
||||
(3, 'refs/changes/2'),
|
||||
(4, 'refs/changes/2'),
|
||||
(5, 'refs/tags/foo'),
|
||||
(6, 'refs/tags/foo'),
|
||||
])
|
||||
results = [r for r in connection.exec_driver_sql(
|
||||
"select bs.uuid, r.ref "
|
||||
"from zuul_buildset bs, zuul_buildset_ref t, zuul_ref r "
|
||||
"where bs.id = t.buildset_id and r.id = t.ref_id "
|
||||
"order by bs.uuid")]
|
||||
self.assertEqual(results, [
|
||||
('bsuuid1', 'refs/changes/1'),
|
||||
('bsuuid2', 'refs/changes/2'),
|
||||
('bsuuid3', 'refs/tags/foo'),
|
||||
])
|
||||
|
||||
def test_migration_f7843ddf1552_failure(self):
|
||||
with self.connection.engine.begin() as connection:
|
||||
connection.exec_driver_sql("set foreign_key_checks=0")
|
||||
for table in connection.exec_driver_sql("show tables"):
|
||||
table = table[0]
|
||||
connection.exec_driver_sql(f"drop table {table}")
|
||||
connection.exec_driver_sql("set foreign_key_checks=1")
|
||||
|
||||
self.connection.force_migrations = True
|
||||
self.connection._migrate('151893067f91')
|
||||
# This test is identical to the one above, except the patchset
|
||||
# sha for the first buildset row is too long to fit in the
|
||||
# column (it's lengith is double). The insert here is fine,
|
||||
# but that will cause the migration to fail.
|
||||
with self.connection.engine.begin() as connection:
|
||||
connection.exec_driver_sql("""
|
||||
insert into zuul_buildset
|
||||
(id, uuid, zuul_ref, project, `change`, patchset, ref,
|
||||
ref_url, oldrev, newrev, branch)
|
||||
values
|
||||
(1, "bsuuid1", "Z1", "project1",
|
||||
1, "1",
|
||||
"refs/changes/1",
|
||||
"http://project1/1",
|
||||
"2d48fe5afe0bc7785294b28e9e96a6c622945b9d"
|
||||
"2d48fe5afe0bc7785294b28e9e96a6c622945b9d",
|
||||
"8a938aff20d691d1b9a9f461c6375f0c45acd305",
|
||||
"master"),
|
||||
(2, "bsuuid2", "Z2", "project1",
|
||||
2, "ee743613ce5b3aee11d12e91e932d7876bc0b40c",
|
||||
"refs/changes/2",
|
||||
"http://project1/2",
|
||||
"bd11c4ff79245ae555418e19840ce4752e48ea38",
|
||||
"d1ffc204e4b4955f2a47e7e37618fcf13bb1b9fd",
|
||||
"stable"),
|
||||
(3, "bsuuid3", "Z3", "project2", NULL, NULL, "refs/tags/foo",
|
||||
"http://project3",
|
||||
"a6cf0e07e9e2964a810196f764fc4ac766742568",
|
||||
"1a226acbd022968914574fdeb467b78bc5bfcc77",
|
||||
NULL)
|
||||
""")
|
||||
connection.exec_driver_sql("""
|
||||
insert into zuul_build
|
||||
(id, buildset_id, uuid, job_name, result)
|
||||
values
|
||||
(1, 1, "builduuid1", "job1", "RESULT1"),
|
||||
(2, 1, "builduuid2", "job2", "RESULT2"),
|
||||
(3, 2, "builduuid3", "job1", "RESULT3"),
|
||||
(4, 2, "builduuid4", "job2", "RESULT4"),
|
||||
(5, 3, "builduuid5", "job3", "RESULT5"),
|
||||
(6, 3, "builduuid6", "job4", "RESULT6")
|
||||
""")
|
||||
|
||||
with testtools.ExpectedException(sqlalchemy.exc.DataError):
|
||||
self.connection._migrate()
|
||||
|
||||
with self.connection.engine.begin() as connection:
|
||||
tables = [r[0] for r in connection.exec_driver_sql(
|
||||
"show tables")]
|
||||
# Make sure we rolled back the buildset_new table creation
|
||||
self.assertNotIn('zuul_buildset_new', tables)
|
||||
self.assertIn('zuul_buildset', tables)
|
||||
|
||||
def test_buildsets(self):
|
||||
tenant = 'tenant1',
|
||||
buildset_uuid = 'deadbeef'
|
||||
change = 1234
|
||||
buildset_args = dict(
|
||||
uuid=buildset_uuid,
|
||||
tenant=tenant,
|
||||
pipeline='check',
|
||||
project='project',
|
||||
change=change,
|
||||
patchset='1',
|
||||
ref='',
|
||||
oldrev='',
|
||||
newrev='',
|
||||
branch='master',
|
||||
zuul_ref='Zdeadbeef',
|
||||
ref_url='http://example.com/1234',
|
||||
event_id='eventid',
|
||||
)
|
||||
|
||||
# Create the buildset entry (driver-internal interface)
|
||||
with self.connection.getSession() as db:
|
||||
db.createBuildSet(**buildset_args)
|
||||
bs = db.createBuildSet(
|
||||
uuid=buildset_uuid,
|
||||
tenant=tenant,
|
||||
pipeline='check',
|
||||
event_id='eventid',
|
||||
)
|
||||
ref = db.getOrCreateRef(
|
||||
project='project',
|
||||
change=change,
|
||||
patchset='1',
|
||||
ref='',
|
||||
oldrev='',
|
||||
newrev='',
|
||||
branch='master',
|
||||
ref_url='http://example.com/1234',
|
||||
)
|
||||
bs.refs.append(ref)
|
||||
|
||||
# Verify that worked using the driver-external interface
|
||||
self.assertEqual(len(self.connection.getBuildsets()), 1)
|
||||
@@ -198,7 +344,7 @@ class TestMysqlDatabase(DBBaseTestCase):
|
||||
# Update the buildset using the internal interface
|
||||
with self.connection.getSession() as db:
|
||||
db_buildset = db.getBuildset(tenant=tenant, uuid=buildset_uuid)
|
||||
self.assertEqual(db_buildset.change, change)
|
||||
self.assertEqual(db_buildset.refs[0].change, change)
|
||||
db_buildset.result = 'SUCCESS'
|
||||
|
||||
# Verify that worked
|
||||
@@ -252,4 +398,151 @@ class TestPostgresqlDatabase(DBBaseTestCase):
|
||||
shell=True,
|
||||
env={'PGPASSWORD': self.db.passwd}
|
||||
)
|
||||
self.assertEqual(alembic_out, sqlalchemy_out)
|
||||
try:
|
||||
self.assertEqual(alembic_out, sqlalchemy_out)
|
||||
except Exception:
|
||||
differ = difflib.Differ()
|
||||
alembic_out = alembic_out.decode('utf8').splitlines()
|
||||
sqlalchemy_out = sqlalchemy_out.decode('utf8').splitlines()
|
||||
diff = '\n'.join(list(differ.compare(alembic_out, sqlalchemy_out)))
|
||||
self.log.debug("Diff:\n%s", diff)
|
||||
raise
|
||||
|
||||
def test_migration_f7843ddf1552(self):
|
||||
with self.connection.engine.begin() as connection:
|
||||
tables = [x[0] for x in connection.exec_driver_sql(
|
||||
"select tablename from pg_catalog.pg_tables "
|
||||
"where schemaname='public'"
|
||||
).all()]
|
||||
|
||||
self.assertTrue(len(tables) > 0)
|
||||
for table in tables:
|
||||
connection.exec_driver_sql(f"drop table {table} cascade")
|
||||
|
||||
self.connection.force_migrations = True
|
||||
self.connection._migrate('151893067f91')
|
||||
with self.connection.engine.begin() as connection:
|
||||
connection.exec_driver_sql("""
|
||||
insert into zuul_buildset
|
||||
(id, uuid, zuul_ref, project, change, patchset, ref,
|
||||
ref_url, oldrev, newrev, branch)
|
||||
values
|
||||
(1, 'bsuuid1', 'Z1', 'project1',
|
||||
1, '1',
|
||||
'refs/changes/1',
|
||||
'http://project1/1',
|
||||
'2d48fe5afe0bc7785294b28e9e96a6c622945b9d',
|
||||
'8a938aff20d691d1b9a9f461c6375f0c45acd305',
|
||||
'master'),
|
||||
(2, 'bsuuid2', 'Z2', 'project1',
|
||||
2, 'ee743613ce5b3aee11d12e91e932d7876bc0b40c',
|
||||
'refs/changes/2',
|
||||
'http://project1/2',
|
||||
'bd11c4ff79245ae555418e19840ce4752e48ea38',
|
||||
'd1ffc204e4b4955f2a47e7e37618fcf13bb1b9fd',
|
||||
'stable'),
|
||||
(3, 'bsuuid3', 'Z3', 'project2', NULL, NULL, 'refs/tags/foo',
|
||||
'http://project3',
|
||||
'a6cf0e07e9e2964a810196f764fc4ac766742568',
|
||||
'1a226acbd022968914574fdeb467b78bc5bfcc77',
|
||||
NULL)
|
||||
""")
|
||||
connection.exec_driver_sql("""
|
||||
insert into zuul_build
|
||||
(id, buildset_id, uuid, job_name, result)
|
||||
values
|
||||
(1, 1, 'builduuid1', 'job1', 'RESULT1'),
|
||||
(2, 1, 'builduuid2', 'job2', 'RESULT2'),
|
||||
(3, 2, 'builduuid3', 'job1', 'RESULT3'),
|
||||
(4, 2, 'builduuid4', 'job2', 'RESULT4'),
|
||||
(5, 3, 'builduuid5', 'job3', 'RESULT5'),
|
||||
(6, 3, 'builduuid6', 'job4', 'RESULT6')
|
||||
""")
|
||||
|
||||
self.connection._migrate()
|
||||
with self.connection.engine.begin() as connection:
|
||||
results = [r for r in connection.exec_driver_sql(
|
||||
"select b.id, r.ref from zuul_build b join zuul_ref r "
|
||||
"on b.ref_id=r.id order by b.id")]
|
||||
self.assertEqual(results, [
|
||||
(1, 'refs/changes/1'),
|
||||
(2, 'refs/changes/1'),
|
||||
(3, 'refs/changes/2'),
|
||||
(4, 'refs/changes/2'),
|
||||
(5, 'refs/tags/foo'),
|
||||
(6, 'refs/tags/foo'),
|
||||
])
|
||||
results = [r for r in connection.exec_driver_sql(
|
||||
"select bs.uuid, r.ref "
|
||||
"from zuul_buildset bs, zuul_buildset_ref t, zuul_ref r "
|
||||
"where bs.id = t.buildset_id and r.id = t.ref_id "
|
||||
"order by bs.uuid")]
|
||||
self.assertEqual(results, [
|
||||
('bsuuid1', 'refs/changes/1'),
|
||||
('bsuuid2', 'refs/changes/2'),
|
||||
('bsuuid3', 'refs/tags/foo'),
|
||||
])
|
||||
|
||||
def test_migration_f7843ddf1552_failure(self):
|
||||
with self.connection.engine.begin() as connection:
|
||||
tables = [x[0] for x in connection.exec_driver_sql(
|
||||
"select tablename from pg_catalog.pg_tables "
|
||||
"where schemaname='public'"
|
||||
).all()]
|
||||
|
||||
self.assertTrue(len(tables) > 0)
|
||||
for table in tables:
|
||||
connection.exec_driver_sql(f"drop table {table} cascade")
|
||||
|
||||
self.connection.force_migrations = True
|
||||
self.connection._migrate('151893067f91')
|
||||
with self.connection.engine.begin() as connection:
|
||||
connection.exec_driver_sql("""
|
||||
insert into zuul_buildset
|
||||
(id, uuid, zuul_ref, project, change, patchset, ref,
|
||||
ref_url, oldrev, newrev, branch)
|
||||
values
|
||||
(1, 'bsuuid1', 'Z1', 'project1',
|
||||
1, '1',
|
||||
'refs/changes/1',
|
||||
'http://project1/1',
|
||||
'2d48fe5afe0bc7785294b28e9e96a6c622945b9d'
|
||||
'2d48fe5afe0bc7785294b28e9e96a6c622945b9d',
|
||||
'8a938aff20d691d1b9a9f461c6375f0c45acd305',
|
||||
'master'),
|
||||
(2, 'bsuuid2', 'Z2', 'project1',
|
||||
2, 'ee743613ce5b3aee11d12e91e932d7876bc0b40c',
|
||||
'refs/changes/2',
|
||||
'http://project1/2',
|
||||
'bd11c4ff79245ae555418e19840ce4752e48ea38',
|
||||
'd1ffc204e4b4955f2a47e7e37618fcf13bb1b9fd',
|
||||
'stable'),
|
||||
(3, 'bsuuid3', 'Z3', 'project2', NULL, NULL, 'refs/tags/foo',
|
||||
'http://project3',
|
||||
'a6cf0e07e9e2964a810196f764fc4ac766742568',
|
||||
'1a226acbd022968914574fdeb467b78bc5bfcc77',
|
||||
NULL)
|
||||
""")
|
||||
connection.exec_driver_sql("""
|
||||
insert into zuul_build
|
||||
(id, buildset_id, uuid, job_name, result)
|
||||
values
|
||||
(1, 1, 'builduuid1', 'job1', 'RESULT1'),
|
||||
(2, 1, 'builduuid2', 'job2', 'RESULT2'),
|
||||
(3, 2, 'builduuid3', 'job1', 'RESULT3'),
|
||||
(4, 2, 'builduuid4', 'job2', 'RESULT4'),
|
||||
(5, 3, 'builduuid5', 'job3', 'RESULT5'),
|
||||
(6, 3, 'builduuid6', 'job4', 'RESULT6')
|
||||
""")
|
||||
|
||||
with testtools.ExpectedException(sqlalchemy.exc.DataError):
|
||||
self.connection._migrate()
|
||||
|
||||
with self.connection.engine.begin() as connection:
|
||||
tables = [x[0] for x in connection.exec_driver_sql(
|
||||
"select tablename from pg_catalog.pg_tables "
|
||||
"where schemaname='public'"
|
||||
).all()]
|
||||
# Make sure we rolled back the buildset_new table creation
|
||||
self.assertNotIn('zuul_buildset_new', tables)
|
||||
self.assertIn('zuul_buildset', tables)
|
||||
|
||||
572
zuul/driver/sql/alembic/versions/f7843ddf1552_bundle_refactor.py
Normal file
572
zuul/driver/sql/alembic/versions/f7843ddf1552_bundle_refactor.py
Normal file
@@ -0,0 +1,572 @@
|
||||
# Copyright 2023 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""bundle_refactor
|
||||
|
||||
Revision ID: f7843ddf1552
|
||||
Revises: 151893067f91
|
||||
Create Date: 2023-09-16 09:25:00.674820
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'f7843ddf1552'
|
||||
down_revision = '151893067f91'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
import logging
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
REF_TABLE = 'zuul_ref'
|
||||
BUILD_TABLE = 'zuul_build'
|
||||
BUILDSET_TABLE = 'zuul_buildset'
|
||||
BUILDSET_REF_TABLE = 'zuul_buildset_ref'
|
||||
ARTIFACT_TABLE = 'zuul_artifact'
|
||||
BUILD_EVENT_TABLE = 'zuul_build_event'
|
||||
PROVIDES_TABLE = 'zuul_provides'
|
||||
|
||||
|
||||
def rename_index(connection, table, old, new):
|
||||
dialect_name = connection.engine.dialect.name
|
||||
if dialect_name == 'mysql':
|
||||
statement = f"""
|
||||
alter table {table}
|
||||
rename index {old}
|
||||
to {new}
|
||||
"""
|
||||
elif dialect_name == 'postgresql':
|
||||
statement = f"""
|
||||
alter index {old}
|
||||
rename to {new}
|
||||
"""
|
||||
else:
|
||||
raise Exception(f"Unsupported dialect {dialect_name}")
|
||||
connection.execute(sa.text(statement))
|
||||
|
||||
|
||||
# This migration has an unusual structure. We do quite a bit of work
|
||||
# on temporary tables before we start replacing our real tables.
|
||||
# Mysql doesn't support transactional DDL, but we can fairly easily
|
||||
# undo our changes up to a certain point just by dropping our
|
||||
# temporary tables. To make life easier on operators in case there is
|
||||
# a problem, if we encounter an error up to the point of no return, we
|
||||
# will drop the tables and leave the db in the state it started. If
|
||||
# we encounter an error after that point, there is little we can do to
|
||||
# recover, but hopefully we will have found errors prior to then, if
|
||||
# any.
|
||||
|
||||
# Postgres has transactional DDL and the entire migraction is in a
|
||||
# single transaction and is automatically rolled back. We just no-op
|
||||
# in that case.
|
||||
|
||||
# To accomodate this structure without having a giant try/except
|
||||
# block, the migration is split into 2 functions, upgrade1() and
|
||||
# upgrade2().
|
||||
|
||||
def upgrade1(connection, table_prefix):
|
||||
# This is the first part of the migration, which is recoverable in
|
||||
# mysql.
|
||||
prefixed_ref = table_prefix + REF_TABLE
|
||||
prefixed_build = table_prefix + BUILD_TABLE
|
||||
prefixed_build_new = table_prefix + BUILD_TABLE + '_new'
|
||||
prefixed_buildset = table_prefix + BUILDSET_TABLE
|
||||
prefixed_buildset_new = table_prefix + BUILDSET_TABLE + '_new'
|
||||
prefixed_buildset_ref = table_prefix + BUILDSET_REF_TABLE
|
||||
quote = connection.engine.dialect.identifier_preparer.quote
|
||||
dialect_name = connection.engine.dialect.name
|
||||
|
||||
# This migration requires updates to existing rows in the
|
||||
# zuul_build and zuul_buildset tables. In postgres, tables have a
|
||||
# fill factor which indicates how much space to leave in pages for
|
||||
# row updates. With a high fill factor (the default is 100%)
|
||||
# large updates can be slow. With a smaller fill factor, large
|
||||
# updates can bu much faster, at the cost of wasted space and
|
||||
# operational overhead. The default of 100% makes sense for all
|
||||
# of our tables. While the build and buildset tables do get some
|
||||
# row updates, they are not very frequent. We would need a very
|
||||
# generous fill factor to be able to update all of the rows in the
|
||||
# build table quickly, and that wouldn't make sense for normal
|
||||
# operation.
|
||||
|
||||
# Instead of adding columns and updating the table, we will
|
||||
# create new tables and populate them with inserts (which is
|
||||
# extremely fast), then remove the old tables and rename the new.
|
||||
|
||||
# First, create zuul_buildset_new table. This will later replace
|
||||
# the zuul_buildset table. It includes some changes:
|
||||
|
||||
# * We intentionally omit the zuul_ref column (in this case,
|
||||
# referring to the old Z<sha> unique ids) because it is obsolete
|
||||
# is obsolete.
|
||||
|
||||
# * The length of the sha fields is lowered to 40. This is so
|
||||
# that in the zuul_ref table, we can make a unique index of
|
||||
# several fields without hitting mysql's index length limit.
|
||||
# We will mutate their values as we insert them here.
|
||||
op.create_table(
|
||||
prefixed_buildset_new,
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('pipeline', sa.String(255)),
|
||||
sa.Column('message', sa.TEXT()),
|
||||
sa.Column('tenant', sa.String(255)),
|
||||
sa.Column('result', sa.String(255)),
|
||||
sa.Column('uuid', sa.String(36)),
|
||||
sa.Column('event_id', sa.String(255), nullable=True),
|
||||
sa.Column('event_timestamp', sa.DateTime, nullable=True),
|
||||
sa.Column('first_build_start_time', sa.DateTime, nullable=True),
|
||||
sa.Column('last_build_end_time', sa.DateTime, nullable=True),
|
||||
sa.Column('updated', sa.DateTime, nullable=True),
|
||||
# Columns we'll drop later, but need now in order to populate
|
||||
# zuul_ref.
|
||||
sa.Column('project', sa.String(255), nullable=False),
|
||||
sa.Column('ref', sa.String(255), nullable=False),
|
||||
sa.Column('ref_url', sa.String(255), nullable=False),
|
||||
sa.Column('change', sa.Integer, nullable=False),
|
||||
sa.Column('patchset', sa.String(40), nullable=False),
|
||||
sa.Column('oldrev', sa.String(40), nullable=False),
|
||||
sa.Column('newrev', sa.String(40), nullable=False),
|
||||
sa.Column('branch', sa.String(255), nullable=False),
|
||||
)
|
||||
|
||||
# The postgres operator "is not distinct from" (equivalent to
|
||||
# mysql's <=>) is a non-indexable operator. So that we can
|
||||
# actually use the unique index (and other indexes in the future)
|
||||
# make all of the ref-related columns non-null. That means empty
|
||||
# strings for strings, and we'll use 0 for the change id. This
|
||||
# lets us use the "=" operator and utilize the index for all
|
||||
# values.
|
||||
statement = f"""
|
||||
insert into {prefixed_buildset_new}
|
||||
select bs.id, bs.pipeline, bs.message,
|
||||
bs.tenant, bs.result, bs.uuid, bs.event_id, bs.event_timestamp,
|
||||
bs.first_build_start_time, bs.last_build_end_time, bs.updated,
|
||||
bs.project, coalesce(bs.ref, ''), coalesce(ref_url, ''),
|
||||
coalesce(bs.change, 0), coalesce(bs.patchset, ''),
|
||||
coalesce(bs.oldrev, ''), coalesce(bs.newrev, ''),
|
||||
coalesce(branch, '')
|
||||
from {prefixed_buildset} bs
|
||||
"""
|
||||
connection.execute(sa.text(statement))
|
||||
|
||||
# Create zuul_ref table.
|
||||
op.create_table(
|
||||
prefixed_ref,
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('project', sa.String(255), nullable=False),
|
||||
sa.Column('ref', sa.String(255), nullable=False),
|
||||
sa.Column('ref_url', sa.String(255), nullable=False),
|
||||
sa.Column('change', sa.Integer, nullable=False),
|
||||
sa.Column('patchset', sa.String(40), nullable=False),
|
||||
sa.Column('oldrev', sa.String(40), nullable=False),
|
||||
sa.Column('newrev', sa.String(40), nullable=False),
|
||||
sa.Column('branch', sa.String(255), nullable=False),
|
||||
)
|
||||
|
||||
# Copy data from buildset to ref.
|
||||
|
||||
# We are going to have a unique index later on some columns, so we
|
||||
# use a "group by" clause here to remove duplicates. We also may
|
||||
# have differing values for ref_url for the same refs (e.g.,
|
||||
# opendev switched gerrit server hostnames), so we arbitrarily
|
||||
# take the first ref_url for a given grouping. It doesn't make
|
||||
# sense for branch to be different, but we do the same in order to
|
||||
# avoid any potential errors.
|
||||
statement = f"""
|
||||
insert into {prefixed_ref}
|
||||
(project, {quote('change')}, patchset,
|
||||
ref, ref_url, oldrev, newrev, branch)
|
||||
select
|
||||
bs.project, bs.change, bs.patchset,
|
||||
bs.ref, min(ref_url), bs.oldrev, bs.newrev, min(branch)
|
||||
from {prefixed_buildset_new} bs
|
||||
group by
|
||||
bs.project, bs.change, bs.patchset,
|
||||
bs.ref, bs.oldrev, bs.newrev
|
||||
"""
|
||||
connection.execute(sa.text(statement))
|
||||
|
||||
# Create our unique ref constraint; this includes as index that
|
||||
# will speed up populating the zuul_buildset_ref table.
|
||||
op.create_unique_constraint(
|
||||
f'{prefixed_ref}_unique',
|
||||
prefixed_ref,
|
||||
['project', 'ref', 'change', 'patchset', 'oldrev', 'newrev'],
|
||||
)
|
||||
|
||||
# Create replacement indexes for the obsolete indexes on the
|
||||
# buildset table.
|
||||
with op.batch_alter_table(prefixed_ref) as batch_op:
|
||||
batch_op.create_index(
|
||||
f'{prefixed_ref}_project_change_idx',
|
||||
['project', 'change'])
|
||||
batch_op.create_index(
|
||||
f'{prefixed_ref}_change_idx',
|
||||
['change'])
|
||||
|
||||
# Add mapping table for buildset <-> ref
|
||||
|
||||
# We will add foreign key constraints after populating the table.
|
||||
op.create_table(
|
||||
prefixed_buildset_ref,
|
||||
sa.Column('buildset_id', sa.Integer, nullable=False),
|
||||
sa.Column('ref_id', sa.Integer, nullable=False),
|
||||
sa.PrimaryKeyConstraint("buildset_id", "ref_id"),
|
||||
)
|
||||
|
||||
# Populate buildset_ref table. Ignore ref_url since we don't
|
||||
# include it in the unique index later.
|
||||
statement = f"""
|
||||
insert into {prefixed_buildset_ref}
|
||||
select {prefixed_buildset_new}.id, {prefixed_ref}.id
|
||||
from {prefixed_buildset_new} left join {prefixed_ref}
|
||||
on {prefixed_buildset_new}.project = {prefixed_ref}.project
|
||||
and {prefixed_buildset_new}.ref = {prefixed_ref}.ref
|
||||
and {prefixed_buildset_new}.change = {prefixed_ref}.change
|
||||
and {prefixed_buildset_new}.patchset = {prefixed_ref}.patchset
|
||||
and {prefixed_buildset_new}.oldrev = {prefixed_ref}.oldrev
|
||||
and {prefixed_buildset_new}.newrev = {prefixed_ref}.newrev
|
||||
"""
|
||||
connection.execute(sa.text(statement))
|
||||
|
||||
# Fix the sequence value since we wrote our own ids (postgres only)
|
||||
if dialect_name == 'postgresql':
|
||||
statement = f"""
|
||||
select setval(
|
||||
'{prefixed_buildset_new}_id_seq',
|
||||
COALESCE((SELECT MAX(id)+1 FROM {prefixed_buildset_new}), 1),
|
||||
false)
|
||||
"""
|
||||
connection.execute(sa.text(statement))
|
||||
|
||||
# Now that the table is populated, add the FK indexes and
|
||||
# constraints to buildset_ref.
|
||||
op.create_index(
|
||||
f'{prefixed_buildset_ref}_buildset_id_idx',
|
||||
prefixed_buildset_ref, ['buildset_id'])
|
||||
op.create_index(
|
||||
f'{prefixed_buildset_ref}_ref_id_idx',
|
||||
prefixed_buildset_ref, ['ref_id'])
|
||||
|
||||
# Alembic doesn't allow us to combine alter table operations, so
|
||||
# we do this manually. It still takes the same total time in
|
||||
# mysql though.
|
||||
statement = f"""
|
||||
alter table {prefixed_buildset_ref}
|
||||
add constraint {prefixed_buildset_ref}_buildset_id_fkey
|
||||
foreign key(buildset_id)
|
||||
references {prefixed_buildset_new} (id),
|
||||
add constraint {prefixed_buildset_ref}_ref_id_fkey
|
||||
foreign key(ref_id)
|
||||
references {prefixed_ref} (id)
|
||||
"""
|
||||
connection.execute(sa.text(statement))
|
||||
|
||||
# Our goal below is to add the ref_id column to the build table
|
||||
# and populate it with a query.
|
||||
|
||||
# Create the new build table.
|
||||
op.create_table(
|
||||
prefixed_build_new,
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('buildset_id', sa.Integer),
|
||||
sa.Column('uuid', sa.String(36)),
|
||||
sa.Column('job_name', sa.String(255)),
|
||||
sa.Column('result', sa.String(255)),
|
||||
sa.Column('start_time', sa.DateTime),
|
||||
sa.Column('end_time', sa.DateTime),
|
||||
sa.Column('voting', sa.Boolean),
|
||||
sa.Column('log_url', sa.String(255)),
|
||||
sa.Column('error_detail', sa.TEXT()),
|
||||
sa.Column('final', sa.Boolean),
|
||||
sa.Column('held', sa.Boolean),
|
||||
sa.Column('nodeset', sa.String(255)),
|
||||
sa.Column('ref_id', sa.Integer),
|
||||
)
|
||||
|
||||
# Populate it with existing values, but get the ref_id for the
|
||||
# build from the buildset_ref table (we know that currently a
|
||||
# buildset is associated with exactly one ref, so we can use that
|
||||
# ref to associate its builds).
|
||||
statement = f"""
|
||||
insert into {prefixed_build_new}
|
||||
select
|
||||
{prefixed_build}.id,
|
||||
{prefixed_build}.buildset_id,
|
||||
{prefixed_build}.uuid,
|
||||
{prefixed_build}.job_name,
|
||||
{prefixed_build}.result,
|
||||
{prefixed_build}.start_time,
|
||||
{prefixed_build}.end_time,
|
||||
{prefixed_build}.voting,
|
||||
{prefixed_build}.log_url,
|
||||
{prefixed_build}.error_detail,
|
||||
{prefixed_build}.final,
|
||||
{prefixed_build}.held,
|
||||
{prefixed_build}.nodeset,
|
||||
{prefixed_buildset_ref}.ref_id
|
||||
from {prefixed_build} left join {prefixed_buildset_ref}
|
||||
on {prefixed_build}.buildset_id =
|
||||
{prefixed_buildset_ref}.buildset_id
|
||||
"""
|
||||
connection.execute(sa.text(statement))
|
||||
|
||||
# Fix the sequence value since we wrote our own ids (postgres only)
|
||||
if dialect_name == 'postgresql':
|
||||
statement = f"""
|
||||
select setval(
|
||||
'{prefixed_build_new}_id_seq',
|
||||
COALESCE((SELECT MAX(id)+1 FROM {prefixed_build_new}), 1),
|
||||
false)
|
||||
"""
|
||||
connection.execute(sa.text(statement))
|
||||
|
||||
# Add the foreign key indexes and constraits to our new table
|
||||
# first, to make sure we can before we drop the old one.
|
||||
with op.batch_alter_table(prefixed_build_new) as batch_op:
|
||||
batch_op.create_index(
|
||||
f'{prefixed_build_new}_buildset_id_idx',
|
||||
['buildset_id'])
|
||||
batch_op.create_index(
|
||||
f'{prefixed_build_new}_ref_id_idx',
|
||||
['ref_id'])
|
||||
|
||||
# Mysql is quite slow about adding FK constraints. But we're
|
||||
# pretty sure the data are valid (one of these constraints is
|
||||
# identical to one on the buildset table, and the other should be
|
||||
# identical to one on the buildset_ref table).
|
||||
# Disable FK checks from this point forward.
|
||||
if dialect_name == 'mysql':
|
||||
connection.execute(sa.text("set foreign_key_checks = 0"))
|
||||
|
||||
statement = f"""
|
||||
alter table {prefixed_build_new}
|
||||
add constraint {prefixed_build_new}_buildset_id_fkey
|
||||
foreign key(buildset_id)
|
||||
references {prefixed_buildset_new} (id),
|
||||
add constraint {prefixed_build_new}_ref_id_fkey
|
||||
foreign key(ref_id)
|
||||
references {prefixed_ref} (id)
|
||||
"""
|
||||
connection.execute(sa.text(statement))
|
||||
|
||||
# After this point we expect everything else to succeed and
|
||||
# we can no longer roll back in mysql without data loss.
|
||||
|
||||
|
||||
def rollback(connection, table_prefix):
|
||||
# This is our homemade best-effort rollback for mysql.
|
||||
prefixed_ref = table_prefix + REF_TABLE
|
||||
prefixed_build_new = table_prefix + BUILD_TABLE + '_new'
|
||||
prefixed_buildset_new = table_prefix + BUILDSET_TABLE + '_new'
|
||||
prefixed_buildset_ref = table_prefix + BUILDSET_REF_TABLE
|
||||
dialect_name = connection.engine.dialect.name
|
||||
|
||||
connection.execute(sa.text(
|
||||
f"drop table if exists {prefixed_ref}"))
|
||||
connection.execute(sa.text(
|
||||
f"drop table if exists {prefixed_build_new}"))
|
||||
connection.execute(sa.text(
|
||||
f"drop table if exists {prefixed_buildset_new}"))
|
||||
connection.execute(sa.text(
|
||||
f"drop table if exists {prefixed_buildset_ref}"))
|
||||
if dialect_name == 'mysql':
|
||||
connection.execute(sa.text("set foreign_key_checks = 1"))
|
||||
|
||||
|
||||
def upgrade2(connection, table_prefix):
|
||||
# This is the second part of the migration, past the point of no
|
||||
# return for mysql.
|
||||
prefixed_ref = table_prefix + REF_TABLE
|
||||
prefixed_build = table_prefix + BUILD_TABLE
|
||||
prefixed_build_new = table_prefix + BUILD_TABLE + '_new'
|
||||
prefixed_buildset = table_prefix + BUILDSET_TABLE
|
||||
prefixed_buildset_new = table_prefix + BUILDSET_TABLE + '_new'
|
||||
prefixed_artifact = table_prefix + ARTIFACT_TABLE
|
||||
prefixed_build_event = table_prefix + BUILD_EVENT_TABLE
|
||||
prefixed_provides = table_prefix + PROVIDES_TABLE
|
||||
quote = connection.engine.dialect.identifier_preparer.quote
|
||||
dialect_name = connection.engine.dialect.name
|
||||
|
||||
# Temporarily drop the FK constraints that reference the old build
|
||||
# table. (This conditional is why we're renaming all the indexes
|
||||
# and constraints to be consistent across different backends).
|
||||
if dialect_name == 'mysql':
|
||||
op.drop_constraint(table_prefix + 'zuul_artifact_ibfk_1',
|
||||
prefixed_artifact, 'foreignkey')
|
||||
op.drop_constraint(table_prefix + 'zuul_build_event_ibfk_1',
|
||||
prefixed_build_event, 'foreignkey')
|
||||
op.drop_constraint(table_prefix + 'zuul_provides_ibfk_1',
|
||||
prefixed_provides, 'foreignkey')
|
||||
elif dialect_name == 'postgresql':
|
||||
op.drop_constraint(table_prefix + 'zuul_artifact_build_id_fkey',
|
||||
prefixed_artifact)
|
||||
op.drop_constraint(table_prefix + 'zuul_build_event_build_id_fkey',
|
||||
prefixed_build_event)
|
||||
op.drop_constraint(table_prefix + 'zuul_provides_build_id_fkey',
|
||||
prefixed_provides)
|
||||
else:
|
||||
raise Exception(f"Unsupported dialect {dialect_name}")
|
||||
|
||||
# Drop the old table
|
||||
op.drop_table(prefixed_build)
|
||||
|
||||
# Rename the table
|
||||
op.rename_table(prefixed_build_new, prefixed_build)
|
||||
|
||||
# Rename the sequence and primary key (postgres only)
|
||||
if dialect_name == 'postgresql':
|
||||
statement = f"""
|
||||
alter sequence {prefixed_build_new}_id_seq
|
||||
rename to {prefixed_build}_id_seq;
|
||||
"""
|
||||
connection.execute(sa.text(statement))
|
||||
rename_index(connection, prefixed_build_new,
|
||||
f'{prefixed_build_new}_pkey',
|
||||
f'{prefixed_build}_pkey')
|
||||
|
||||
# Replace the indexes
|
||||
with op.batch_alter_table(prefixed_build) as batch_op:
|
||||
# This used to be named job_name_buildset_id_idx, let's
|
||||
# upgrade to our new naming scheme
|
||||
batch_op.create_index(
|
||||
f'{prefixed_build}_job_name_buildset_id_idx',
|
||||
['job_name', 'buildset_id'])
|
||||
# Previously named uuid_buildset_id_idx
|
||||
batch_op.create_index(
|
||||
f'{prefixed_build}_uuid_buildset_id_idx',
|
||||
['uuid', 'buildset_id'])
|
||||
|
||||
# Rename indexes
|
||||
rename_index(connection, prefixed_build,
|
||||
f'{prefixed_build_new}_buildset_id_idx',
|
||||
f'{prefixed_build}_buildset_id_idx')
|
||||
rename_index(connection, prefixed_build,
|
||||
f'{prefixed_build_new}_ref_id_idx',
|
||||
f'{prefixed_build}_ref_id_idx')
|
||||
|
||||
# Mysql does not support renaming constraints, so we drop and
|
||||
# re-add them. We added them earlier to confirm that there were
|
||||
# no errors before dropping the original table (though in mysql we
|
||||
# did so with checks disabled, but postgres was able to validate
|
||||
# them. We could have skipped the earlier add for mysql, but this
|
||||
# keeps the code simpler and more consistent, and is still fast).
|
||||
statement = f"""
|
||||
alter table {prefixed_build}
|
||||
drop constraint {prefixed_build_new}_buildset_id_fkey,
|
||||
drop constraint {prefixed_build_new}_ref_id_fkey,
|
||||
add constraint {prefixed_build}_buildset_id_fkey
|
||||
foreign key(buildset_id)
|
||||
references {prefixed_buildset_new} (id),
|
||||
add constraint {prefixed_build}_ref_id_fkey
|
||||
foreign key(ref_id)
|
||||
references {prefixed_ref} (id)
|
||||
"""
|
||||
connection.execute(sa.text(statement))
|
||||
|
||||
# Re-add the referencing FK constraints
|
||||
op.create_foreign_key(
|
||||
f'{prefixed_artifact}_build_id_fkey',
|
||||
prefixed_artifact,
|
||||
prefixed_build,
|
||||
['build_id'], ['id'])
|
||||
op.create_foreign_key(
|
||||
f'{prefixed_build_event}_build_id_fkey',
|
||||
prefixed_build_event,
|
||||
prefixed_build,
|
||||
['build_id'], ['id'])
|
||||
op.create_foreign_key(
|
||||
f'{prefixed_provides}_build_id_fkey',
|
||||
prefixed_provides,
|
||||
prefixed_build,
|
||||
['build_id'], ['id'])
|
||||
|
||||
# Rename some indexes for a consistent naming scheme
|
||||
rename_index(connection, prefixed_artifact,
|
||||
f'{table_prefix}artifact_build_id_idx',
|
||||
f'{prefixed_artifact}_build_id_idx')
|
||||
rename_index(connection, prefixed_build_event,
|
||||
f'{table_prefix}build_event_build_id_idx',
|
||||
f'{prefixed_build_event}_build_id_idx')
|
||||
rename_index(connection, prefixed_provides,
|
||||
f'{table_prefix}provides_build_id_idx',
|
||||
f'{prefixed_provides}_build_id_idx')
|
||||
|
||||
# Rename the buildset table
|
||||
op.drop_table(prefixed_buildset)
|
||||
op.rename_table(prefixed_buildset_new, prefixed_buildset)
|
||||
|
||||
# Rename the sequence and primary key (postgres only)
|
||||
if dialect_name == 'postgresql':
|
||||
statement = f"""
|
||||
alter sequence {prefixed_buildset_new}_id_seq
|
||||
rename to {prefixed_buildset}_id_seq;
|
||||
"""
|
||||
connection.execute(sa.text(statement))
|
||||
rename_index(connection, prefixed_build_new,
|
||||
f'{prefixed_buildset_new}_pkey',
|
||||
f'{prefixed_buildset}_pkey')
|
||||
|
||||
# Drop the columns that are no longer used in one statement for
|
||||
# efficiency (alembic doesn't have a way to do this).
|
||||
statement = f"""alter table {prefixed_buildset}
|
||||
drop column project,
|
||||
drop column {quote('change')},
|
||||
drop column patchset,
|
||||
drop column ref,
|
||||
drop column ref_url,
|
||||
drop column oldrev,
|
||||
drop column newrev,
|
||||
drop column branch
|
||||
"""
|
||||
connection.execute(sa.text(statement))
|
||||
|
||||
# Replace the only remaining buildset index
|
||||
op.create_index(
|
||||
f'{prefixed_buildset}_uuid_idx',
|
||||
prefixed_buildset,
|
||||
['uuid'])
|
||||
|
||||
# Re-enable FK checks for mysql
|
||||
if dialect_name == 'mysql':
|
||||
connection.execute(sa.text("set foreign_key_checks = 1"))
|
||||
|
||||
|
||||
def upgrade(table_prefix=''):
|
||||
# The actual upgrade method, simplified for exception/rollback
|
||||
# handling.
|
||||
connection = op.get_bind()
|
||||
dialect_name = connection.engine.dialect.name
|
||||
if dialect_name not in ['mysql', 'postgresql']:
|
||||
raise Exception(f"Unsupported dialect {dialect_name}")
|
||||
|
||||
log = logging.getLogger('zuul.SQLMigration')
|
||||
try:
|
||||
upgrade1(connection, table_prefix)
|
||||
except Exception:
|
||||
try:
|
||||
if dialect_name == 'mysql':
|
||||
log.error("Early error in schema migration, rolling back")
|
||||
rollback(connection, table_prefix)
|
||||
except Exception:
|
||||
log.exception("Error in migration rollback:")
|
||||
raise
|
||||
upgrade2(connection, table_prefix)
|
||||
|
||||
|
||||
def downgrade():
|
||||
raise Exception("Downgrades not supported")
|
||||
@@ -1,4 +1,5 @@
|
||||
# Copyright 2014 Rackspace Australia
|
||||
# Copyright 2023 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@@ -30,6 +31,8 @@ from zuul.zk.locks import CONNECTION_LOCK_ROOT, locked, SessionAwareLock
|
||||
|
||||
|
||||
BUILDSET_TABLE = 'zuul_buildset'
|
||||
REF_TABLE = 'zuul_ref'
|
||||
BUILDSET_REF_TABLE = 'zuul_buildset_ref'
|
||||
BUILD_TABLE = 'zuul_build'
|
||||
BUILD_EVENTS_TABLE = 'zuul_build_event'
|
||||
ARTIFACT_TABLE = 'zuul_artifact'
|
||||
@@ -52,8 +55,53 @@ def _set_timeout(conn, cursor, stmt, params, context, executemany):
|
||||
cursor.execute("SET LOCAL statement_timeout=%s" % match.groups())
|
||||
|
||||
|
||||
class DatabaseSession(object):
|
||||
class ChangeType(sa.TypeDecorator):
|
||||
"""Coerces NULL/None values to/from the integer 0 to facilitate use in
|
||||
non-nullable columns
|
||||
|
||||
"""
|
||||
# Underlying implementation
|
||||
impl = sa.Integer
|
||||
# We produce consistent values, so can be cached
|
||||
cache_ok = True
|
||||
# Don't use "IS NULL" when comparing to None values
|
||||
coerce_to_is_types = ()
|
||||
|
||||
def process_bind_param(self, value, dialect):
|
||||
if value is None:
|
||||
return 0
|
||||
return value
|
||||
|
||||
def process_result_value(self, value, dialect):
|
||||
if value == 0:
|
||||
return None
|
||||
return value
|
||||
|
||||
|
||||
class SHAType(sa.TypeDecorator):
|
||||
"""Coerces NULL/None values to/from the empty string to facilitate use
|
||||
in non-nullable columns
|
||||
|
||||
"""
|
||||
# Underlying implementation
|
||||
impl = sa.String
|
||||
# We produce consistent values, so can be cached
|
||||
cache_ok = True
|
||||
# Don't use "IS NULL" when comparing to None values
|
||||
coerce_to_is_types = ()
|
||||
|
||||
def process_bind_param(self, value, dialect):
|
||||
if value is None:
|
||||
return ''
|
||||
return value
|
||||
|
||||
def process_result_value(self, value, dialect):
|
||||
if value == '':
|
||||
return None
|
||||
return value
|
||||
|
||||
|
||||
class DatabaseSession(object):
|
||||
log = logging.getLogger("zuul.DatabaseSession")
|
||||
|
||||
def __init__(self, connection):
|
||||
@@ -96,6 +144,7 @@ class DatabaseSession(object):
|
||||
offset=0, idx_min=None, idx_max=None,
|
||||
exclude_result=None, query_timeout=None):
|
||||
|
||||
ref_table = self.connection.zuul_ref_table
|
||||
build_table = self.connection.zuul_build_table
|
||||
buildset_table = self.connection.zuul_buildset_table
|
||||
provides_table = self.connection.zuul_provides_table
|
||||
@@ -105,8 +154,10 @@ class DatabaseSession(object):
|
||||
# joinedload).
|
||||
q = self.session().query(self.connection.buildModel).\
|
||||
join(self.connection.buildSetModel).\
|
||||
join(self.connection.refModel).\
|
||||
outerjoin(self.connection.providesModel).\
|
||||
options(orm.contains_eager(self.connection.buildModel.buildset),
|
||||
orm.contains_eager(self.connection.buildModel.ref),
|
||||
orm.selectinload(self.connection.buildModel.provides),
|
||||
orm.selectinload(self.connection.buildModel.artifacts))
|
||||
# If the query planner isn't able to reduce either the number
|
||||
@@ -134,13 +185,13 @@ class DatabaseSession(object):
|
||||
dialect_name='postgresql')
|
||||
|
||||
q = self.listFilter(q, buildset_table.c.tenant, tenant)
|
||||
q = self.listFilter(q, buildset_table.c.project, project)
|
||||
q = self.listFilter(q, buildset_table.c.pipeline, pipeline)
|
||||
q = self.listFilter(q, buildset_table.c.change, change)
|
||||
q = self.listFilter(q, buildset_table.c.branch, branch)
|
||||
q = self.listFilter(q, buildset_table.c.patchset, patchset)
|
||||
q = self.listFilter(q, buildset_table.c.ref, ref)
|
||||
q = self.listFilter(q, buildset_table.c.newrev, newrev)
|
||||
q = self.listFilter(q, ref_table.c.project, project)
|
||||
q = self.listFilter(q, ref_table.c.change, change)
|
||||
q = self.listFilter(q, ref_table.c.branch, branch)
|
||||
q = self.listFilter(q, ref_table.c.patchset, patchset)
|
||||
q = self.listFilter(q, ref_table.c.ref, ref)
|
||||
q = self.listFilter(q, ref_table.c.newrev, newrev)
|
||||
q = self.listFilter(q, buildset_table.c.event_id, event_id)
|
||||
q = self.listFilter(
|
||||
q, buildset_table.c.event_timestamp, event_timestamp)
|
||||
@@ -210,6 +261,31 @@ class DatabaseSession(object):
|
||||
self.session().flush()
|
||||
return bs
|
||||
|
||||
def getOrCreateRef(self, project, ref, ref_url,
|
||||
change=None, patchset=None, branch=None,
|
||||
oldrev=None, newrev=None):
|
||||
ref_table = self.connection.zuul_ref_table
|
||||
q = self.session().query(self.connection.refModel)
|
||||
# We only query for the columns that we include in the unique
|
||||
# constraint (i.e., we omit branch and ref_url which should be
|
||||
# guaranteed to be unique by ref).
|
||||
q = q.filter(ref_table.c.project == project,
|
||||
ref_table.c.ref == ref,
|
||||
ref_table.c.change == change,
|
||||
ref_table.c.patchset == patchset,
|
||||
ref_table.c.oldrev == oldrev,
|
||||
ref_table.c.newrev == newrev)
|
||||
ret = q.all()
|
||||
if ret:
|
||||
return ret[0]
|
||||
ret = self.connection.refModel(
|
||||
project=project, ref=ref, ref_url=ref_url,
|
||||
change=change, patchset=patchset, branch=branch,
|
||||
oldrev=oldrev, newrev=newrev)
|
||||
self.session().add(ret)
|
||||
self.session().flush()
|
||||
return ret
|
||||
|
||||
def getBuildsets(self, tenant=None, project=None, pipeline=None,
|
||||
change=None, branch=None, patchset=None, ref=None,
|
||||
newrev=None, uuid=None, result=None, complete=None,
|
||||
@@ -218,9 +294,13 @@ class DatabaseSession(object):
|
||||
query_timeout=None):
|
||||
|
||||
buildset_table = self.connection.zuul_buildset_table
|
||||
ref_table = self.connection.zuul_ref_table
|
||||
|
||||
# See note above about the hint.
|
||||
q = self.session().query(self.connection.buildSetModel)
|
||||
q = self.session().query(self.connection.buildSetModel).\
|
||||
join(self.connection.buildSetRefModel).\
|
||||
join(self.connection.refModel).\
|
||||
options(orm.contains_eager(self.connection.buildSetModel.refs))
|
||||
if not (project or change or uuid):
|
||||
q = q.with_hint(buildset_table, 'USE INDEX (PRIMARY)', 'mysql')
|
||||
|
||||
@@ -236,13 +316,13 @@ class DatabaseSession(object):
|
||||
dialect_name='postgresql')
|
||||
|
||||
q = self.listFilter(q, buildset_table.c.tenant, tenant)
|
||||
q = self.listFilter(q, buildset_table.c.project, project)
|
||||
q = self.listFilter(q, buildset_table.c.pipeline, pipeline)
|
||||
q = self.listFilter(q, buildset_table.c.change, change)
|
||||
q = self.listFilter(q, buildset_table.c.branch, branch)
|
||||
q = self.listFilter(q, buildset_table.c.patchset, patchset)
|
||||
q = self.listFilter(q, buildset_table.c.ref, ref)
|
||||
q = self.listFilter(q, buildset_table.c.newrev, newrev)
|
||||
q = self.listFilter(q, ref_table.c.project, project)
|
||||
q = self.listFilter(q, ref_table.c.change, change)
|
||||
q = self.listFilter(q, ref_table.c.branch, branch)
|
||||
q = self.listFilter(q, ref_table.c.patchset, patchset)
|
||||
q = self.listFilter(q, ref_table.c.ref, ref)
|
||||
q = self.listFilter(q, ref_table.c.newrev, newrev)
|
||||
q = self.listFilter(q, buildset_table.c.uuid, uuid)
|
||||
q = self.listFilter(q, buildset_table.c.result, result)
|
||||
if idx_min:
|
||||
@@ -273,6 +353,7 @@ class DatabaseSession(object):
|
||||
buildset_table = self.connection.zuul_buildset_table
|
||||
|
||||
q = self.session().query(self.connection.buildSetModel).\
|
||||
options(orm.joinedload(self.connection.buildSetModel.refs)).\
|
||||
options(orm.joinedload(self.connection.buildSetModel.builds).
|
||||
subqueryload(self.connection.buildModel.artifacts)).\
|
||||
options(orm.joinedload(self.connection.buildSetModel.builds).
|
||||
@@ -330,7 +411,6 @@ class SQLConnection(BaseConnection):
|
||||
try:
|
||||
self.dburi = self.connection_config.get('dburi')
|
||||
self.metadata = sa.MetaData()
|
||||
self._setup_models()
|
||||
|
||||
# Recycle connections if they've been idle for more than 1 second.
|
||||
# MySQL connections are lightweight and thus keeping long-lived
|
||||
@@ -341,6 +421,8 @@ class SQLConnection(BaseConnection):
|
||||
pool_recycle=self.connection_config.get('pool_recycle', 1),
|
||||
future=True)
|
||||
|
||||
self._setup_models()
|
||||
|
||||
# If we want the objects returned from query() to be
|
||||
# usable outside of the session, we need to expunge them
|
||||
# from the session, and since the DatabaseSession always
|
||||
@@ -408,22 +490,32 @@ class SQLConnection(BaseConnection):
|
||||
def _setup_models(self):
|
||||
Base = orm.declarative_base(metadata=self.metadata)
|
||||
|
||||
class RefModel(Base):
|
||||
__tablename__ = self.table_prefix + REF_TABLE
|
||||
id = sa.Column(sa.Integer, primary_key=True)
|
||||
project = sa.Column(sa.String(255), nullable=False)
|
||||
ref = sa.Column(sa.String(255), nullable=False)
|
||||
ref_url = sa.Column(sa.String(255), nullable=False)
|
||||
change = sa.Column(ChangeType, nullable=False)
|
||||
patchset = sa.Column(SHAType(40), nullable=False)
|
||||
oldrev = sa.Column(SHAType(40), nullable=False)
|
||||
newrev = sa.Column(SHAType(40), nullable=False)
|
||||
branch = sa.Column(sa.String(255), nullable=False)
|
||||
|
||||
sa.Index(self.table_prefix + 'zuul_ref_project_change_idx',
|
||||
project, change)
|
||||
sa.Index(self.table_prefix + 'zuul_ref_change_idx', change)
|
||||
sa.UniqueConstraint(
|
||||
project, ref, change, patchset, oldrev, newrev,
|
||||
name=self.table_prefix + 'zuul_ref_unique')
|
||||
|
||||
class BuildSetModel(Base):
|
||||
__tablename__ = self.table_prefix + BUILDSET_TABLE
|
||||
id = sa.Column(sa.Integer, primary_key=True)
|
||||
zuul_ref = sa.Column(sa.String(255))
|
||||
pipeline = sa.Column(sa.String(255))
|
||||
project = sa.Column(sa.String(255))
|
||||
change = sa.Column(sa.Integer, nullable=True)
|
||||
patchset = sa.Column(sa.String(255), nullable=True)
|
||||
ref = sa.Column(sa.String(255))
|
||||
message = sa.Column(sa.TEXT())
|
||||
tenant = sa.Column(sa.String(255))
|
||||
result = sa.Column(sa.String(255))
|
||||
ref_url = sa.Column(sa.String(255))
|
||||
oldrev = sa.Column(sa.String(255))
|
||||
newrev = sa.Column(sa.String(255))
|
||||
branch = sa.Column(sa.String(255))
|
||||
uuid = sa.Column(sa.String(36))
|
||||
event_id = sa.Column(sa.String(255), nullable=True)
|
||||
event_timestamp = sa.Column(sa.DateTime, nullable=True)
|
||||
@@ -431,27 +523,46 @@ class SQLConnection(BaseConnection):
|
||||
last_build_end_time = sa.Column(sa.DateTime, nullable=True)
|
||||
updated = sa.Column(sa.DateTime, nullable=True)
|
||||
|
||||
sa.Index(self.table_prefix + 'project_pipeline_idx',
|
||||
project, pipeline)
|
||||
sa.Index(self.table_prefix + 'project_change_idx',
|
||||
project, change)
|
||||
sa.Index(self.table_prefix + 'change_idx', change)
|
||||
sa.Index(self.table_prefix + 'uuid_idx', uuid)
|
||||
refs = orm.relationship(
|
||||
RefModel,
|
||||
secondary=self.table_prefix + BUILDSET_REF_TABLE)
|
||||
sa.Index(self.table_prefix + 'zuul_buildset_uuid_idx', uuid)
|
||||
|
||||
def createBuild(self, *args, **kw):
|
||||
def createBuild(self, ref, *args, **kw):
|
||||
session = orm.session.Session.object_session(self)
|
||||
b = BuildModel(*args, **kw)
|
||||
b.buildset_id = self.id
|
||||
b.ref_id = ref.id
|
||||
self.builds.append(b)
|
||||
session.add(b)
|
||||
session.flush()
|
||||
return b
|
||||
|
||||
class BuildSetRefModel(Base):
|
||||
__tablename__ = self.table_prefix + BUILDSET_REF_TABLE
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint('buildset_id', 'ref_id'),
|
||||
)
|
||||
buildset_id = sa.Column(sa.Integer, sa.ForeignKey(
|
||||
self.table_prefix + BUILDSET_TABLE + ".id",
|
||||
name=self.table_prefix + 'zuul_buildset_ref_buildset_id_fkey',
|
||||
))
|
||||
ref_id = sa.Column(sa.Integer, sa.ForeignKey(
|
||||
self.table_prefix + REF_TABLE + ".id",
|
||||
name=self.table_prefix + 'zuul_buildset_ref_ref_id_fkey',
|
||||
))
|
||||
sa.Index(self.table_prefix + 'zuul_buildset_ref_buildset_id_idx',
|
||||
buildset_id)
|
||||
sa.Index(self.table_prefix + 'zuul_buildset_ref_ref_id_idx',
|
||||
ref_id)
|
||||
|
||||
class BuildModel(Base):
|
||||
__tablename__ = self.table_prefix + BUILD_TABLE
|
||||
id = sa.Column(sa.Integer, primary_key=True)
|
||||
buildset_id = sa.Column(sa.Integer, sa.ForeignKey(
|
||||
self.table_prefix + BUILDSET_TABLE + ".id"))
|
||||
self.table_prefix + BUILDSET_TABLE + ".id",
|
||||
name=self.table_prefix + 'zuul_build_buildset_id_fkey',
|
||||
))
|
||||
uuid = sa.Column(sa.String(36))
|
||||
job_name = sa.Column(sa.String(255))
|
||||
result = sa.Column(sa.String(255))
|
||||
@@ -463,17 +574,25 @@ class SQLConnection(BaseConnection):
|
||||
final = sa.Column(sa.Boolean)
|
||||
held = sa.Column(sa.Boolean)
|
||||
nodeset = sa.Column(sa.String(255))
|
||||
ref_id = sa.Column(sa.Integer, sa.ForeignKey(
|
||||
self.table_prefix + REF_TABLE + ".id",
|
||||
name=self.table_prefix + 'zuul_build_ref_id_fkey',
|
||||
))
|
||||
|
||||
buildset = orm.relationship(BuildSetModel,
|
||||
backref=orm.backref(
|
||||
"builds",
|
||||
cascade="all, delete-orphan"))
|
||||
ref = orm.relationship(RefModel)
|
||||
|
||||
sa.Index(self.table_prefix + 'job_name_buildset_id_idx',
|
||||
sa.Index(self.table_prefix + 'zuul_build_job_name_buildset_id_idx',
|
||||
job_name, buildset_id)
|
||||
sa.Index(self.table_prefix + 'uuid_buildset_id_idx',
|
||||
sa.Index(self.table_prefix + 'zuul_build_uuid_buildset_id_idx',
|
||||
uuid, buildset_id)
|
||||
sa.Index(self.table_prefix + 'build_buildset_id_idx',
|
||||
sa.Index(self.table_prefix + 'zuul_build_buildset_id_idx',
|
||||
buildset_id)
|
||||
sa.Index(self.table_prefix + 'zuul_build_ref_id_idx',
|
||||
ref_id)
|
||||
|
||||
@property
|
||||
def duration(self):
|
||||
@@ -523,7 +642,9 @@ class SQLConnection(BaseConnection):
|
||||
__tablename__ = self.table_prefix + ARTIFACT_TABLE
|
||||
id = sa.Column(sa.Integer, primary_key=True)
|
||||
build_id = sa.Column(sa.Integer, sa.ForeignKey(
|
||||
self.table_prefix + BUILD_TABLE + ".id"))
|
||||
self.table_prefix + BUILD_TABLE + ".id",
|
||||
name=self.table_prefix + 'zuul_artifact_build_id_fkey',
|
||||
))
|
||||
name = sa.Column(sa.String(255))
|
||||
url = sa.Column(sa.TEXT())
|
||||
meta = sa.Column('metadata', sa.TEXT())
|
||||
@@ -531,27 +652,31 @@ class SQLConnection(BaseConnection):
|
||||
backref=orm.backref(
|
||||
"artifacts",
|
||||
cascade="all, delete-orphan"))
|
||||
sa.Index(self.table_prefix + 'artifact_build_id_idx',
|
||||
sa.Index(self.table_prefix + 'zuul_artifact_build_id_idx',
|
||||
build_id)
|
||||
|
||||
class ProvidesModel(Base):
|
||||
__tablename__ = self.table_prefix + PROVIDES_TABLE
|
||||
id = sa.Column(sa.Integer, primary_key=True)
|
||||
build_id = sa.Column(sa.Integer, sa.ForeignKey(
|
||||
self.table_prefix + BUILD_TABLE + ".id"))
|
||||
self.table_prefix + BUILD_TABLE + ".id",
|
||||
name=self.table_prefix + 'zuul_provides_build_id_fkey',
|
||||
))
|
||||
name = sa.Column(sa.String(255))
|
||||
build = orm.relationship(BuildModel,
|
||||
backref=orm.backref(
|
||||
"provides",
|
||||
cascade="all, delete-orphan"))
|
||||
sa.Index(self.table_prefix + 'provides_build_id_idx',
|
||||
sa.Index(self.table_prefix + 'zuul_provides_build_id_idx',
|
||||
build_id)
|
||||
|
||||
class BuildEventModel(Base):
|
||||
__tablename__ = self.table_prefix + BUILD_EVENTS_TABLE
|
||||
id = sa.Column(sa.Integer, primary_key=True)
|
||||
build_id = sa.Column(sa.Integer, sa.ForeignKey(
|
||||
self.table_prefix + BUILD_TABLE + ".id"))
|
||||
self.table_prefix + BUILD_TABLE + ".id",
|
||||
name=self.table_prefix + 'zuul_build_event_build_id_fkey',
|
||||
))
|
||||
event_time = sa.Column(sa.DateTime)
|
||||
event_type = sa.Column(sa.String(255))
|
||||
description = sa.Column(sa.TEXT())
|
||||
@@ -559,7 +684,7 @@ class SQLConnection(BaseConnection):
|
||||
backref=orm.backref(
|
||||
"build_events",
|
||||
cascade="all, delete-orphan"))
|
||||
sa.Index(self.table_prefix + 'build_event_build_id_idx',
|
||||
sa.Index(self.table_prefix + 'zuul_build_event_build_id_idx',
|
||||
build_id)
|
||||
|
||||
self.buildEventModel = BuildEventModel
|
||||
@@ -577,6 +702,12 @@ class SQLConnection(BaseConnection):
|
||||
self.buildSetModel = BuildSetModel
|
||||
self.zuul_buildset_table = self.buildSetModel.__table__
|
||||
|
||||
self.refModel = RefModel
|
||||
self.zuul_ref_table = self.refModel.__table__
|
||||
|
||||
self.buildSetRefModel = BuildSetRefModel
|
||||
self.zuul_buildset_ref_table = self.buildSetRefModel.__table__
|
||||
|
||||
def onStop(self):
|
||||
self.log.debug("Stopping SQL connection %s" % self.connection_name)
|
||||
self.engine.dispose()
|
||||
|
||||
@@ -53,23 +53,26 @@ class SQLReporter(BaseReporter):
|
||||
event_id = getattr(item.event, "zuul_event_id", None)
|
||||
event_timestamp = datetime.datetime.fromtimestamp(
|
||||
item.event.timestamp, tz=datetime.timezone.utc)
|
||||
db_buildset = db.createBuildSet(
|
||||
uuid=buildset.uuid,
|
||||
tenant=item.pipeline.tenant.name,
|
||||
pipeline=item.pipeline.name,
|
||||
|
||||
ref = db.getOrCreateRef(
|
||||
project=item.change.project.name,
|
||||
change=getattr(item.change, 'number', None),
|
||||
patchset=getattr(item.change, 'patchset', None),
|
||||
ref_url=item.change.url,
|
||||
ref=getattr(item.change, 'ref', ''),
|
||||
oldrev=getattr(item.change, 'oldrev', ''),
|
||||
newrev=getattr(item.change, 'newrev', ''),
|
||||
branch=getattr(item.change, 'branch', ''),
|
||||
zuul_ref=buildset.ref,
|
||||
ref_url=item.change.url,
|
||||
)
|
||||
db_buildset = db.createBuildSet(
|
||||
uuid=buildset.uuid,
|
||||
tenant=item.pipeline.tenant.name,
|
||||
pipeline=item.pipeline.name,
|
||||
event_id=event_id,
|
||||
event_timestamp=event_timestamp,
|
||||
updated=datetime.datetime.utcnow(),
|
||||
)
|
||||
db_buildset.refs.append(ref)
|
||||
return db_buildset
|
||||
|
||||
def reportBuildsetStart(self, buildset):
|
||||
@@ -196,8 +199,20 @@ class SQLReporter(BaseReporter):
|
||||
db_buildset = self._createBuildset(db, buildset)
|
||||
if db_buildset.first_build_start_time is None:
|
||||
db_buildset.first_build_start_time = start
|
||||
item = buildset.item
|
||||
ref = db.getOrCreateRef(
|
||||
project=item.change.project.name,
|
||||
change=getattr(item.change, 'number', None),
|
||||
patchset=getattr(item.change, 'patchset', None),
|
||||
ref_url=item.change.url,
|
||||
ref=getattr(item.change, 'ref', ''),
|
||||
oldrev=getattr(item.change, 'oldrev', ''),
|
||||
newrev=getattr(item.change, 'newrev', ''),
|
||||
branch=getattr(item.change, 'branch', ''),
|
||||
)
|
||||
|
||||
db_build = db_buildset.createBuild(
|
||||
ref=ref,
|
||||
uuid=build.uuid,
|
||||
job_name=build.job.name,
|
||||
start_time=start,
|
||||
|
||||
@@ -5392,15 +5392,15 @@ class QueueItem(zkobject.ZKObject):
|
||||
'(triggered by change %s on project %s), but that build '
|
||||
'failed with result "%s"' % (
|
||||
job.name, ', '.join(requirement), build.uuid,
|
||||
build.buildset.change, build.buildset.project,
|
||||
build.ref.change, build.ref.project,
|
||||
build.result))
|
||||
else:
|
||||
for a in build.artifacts:
|
||||
artifact = {'name': a.name,
|
||||
'url': a.url,
|
||||
'project': build.buildset.project,
|
||||
'change': str(build.buildset.change),
|
||||
'patchset': build.buildset.patchset,
|
||||
'project': build.ref.project,
|
||||
'change': str(build.ref.change),
|
||||
'patchset': build.ref.patchset,
|
||||
'job': build.job_name}
|
||||
if a.meta:
|
||||
artifact['metadata'] = json.loads(a.meta)
|
||||
|
||||
@@ -1398,17 +1398,19 @@ class ZuulWebAPI(object):
|
||||
'provides': [],
|
||||
}
|
||||
|
||||
# TODO: This should not be conditional in the future, when we
|
||||
# can have multiple refs for a buildset.
|
||||
if buildset:
|
||||
event_timestamp = self._datetimeToString(buildset.event_timestamp)
|
||||
ret.update({
|
||||
'project': buildset.project,
|
||||
'branch': buildset.branch,
|
||||
'project': build.ref.project,
|
||||
'branch': build.ref.branch,
|
||||
'pipeline': buildset.pipeline,
|
||||
'change': buildset.change,
|
||||
'patchset': buildset.patchset,
|
||||
'ref': buildset.ref,
|
||||
'newrev': buildset.newrev,
|
||||
'ref_url': buildset.ref_url,
|
||||
'change': build.ref.change,
|
||||
'patchset': build.ref.patchset,
|
||||
'ref': build.ref.ref,
|
||||
'newrev': build.ref.newrev,
|
||||
'ref_url': build.ref.ref_url,
|
||||
'event_id': buildset.event_id,
|
||||
'event_timestamp': event_timestamp,
|
||||
'buildset': {
|
||||
@@ -1498,14 +1500,15 @@ class ZuulWebAPI(object):
|
||||
'uuid': buildset.uuid,
|
||||
'result': buildset.result,
|
||||
'message': buildset.message,
|
||||
'project': buildset.project,
|
||||
'branch': buildset.branch,
|
||||
'project': buildset.refs[0].project,
|
||||
'branch': buildset.refs[0].branch,
|
||||
'pipeline': buildset.pipeline,
|
||||
'change': buildset.change,
|
||||
'patchset': buildset.patchset,
|
||||
'ref': buildset.ref,
|
||||
'newrev': buildset.newrev,
|
||||
'ref_url': buildset.ref_url,
|
||||
'change': buildset.refs[0].change,
|
||||
'patchset': buildset.refs[0].patchset,
|
||||
'ref': buildset.refs[0].ref,
|
||||
'oldrev': buildset.refs[0].oldrev,
|
||||
'newrev': buildset.refs[0].newrev,
|
||||
'ref_url': buildset.refs[0].ref_url,
|
||||
'event_id': buildset.event_id,
|
||||
'event_timestamp': event_timestamp,
|
||||
'first_build_start_time': start,
|
||||
|
||||
Reference in New Issue
Block a user