SQLAlchemy 2.0: Omnibus fixes patch
This was originally five patches, but they are all needed to pass any of the test jobs now, so they have been squashed into one: Co-Authored-By: Dan Smith (dms@danplanet.com) First: The autoload argument was removed[1] in SQLAlchemy and only the autoload_with argument should be passed. The autoload argument is set according to the autoload_with argument automatically even in SQLAlchemy 1.x[2] so is not at all needed. [1]c932123bac
[2]ad8f921e96
Second: Remove _warn_on_bytestring for newer SA, AFAICT, this flag has been removed from SQLAlchemy and that is why watcher-db-manage fails to initialize the DB for me on jammy. This migration was passing the default value (=False) anyway, so I assume this is the right "fix". Third: Fix joinedload passing string attribute names Fourth: Fix engine.select pattern to use begin() per the migration guide. Fifth: Override the apscheduler get_next_run_time() which appears to be trivially not compatible with SQLAlchemy 2.0 because of a return type from scalar(). Change-Id: I000e5e78f97f82ed4ea64d42f1c38354c3252e08
This commit is contained in:
parent
bc5922c684
commit
d6f169197e
@ -28,7 +28,7 @@ def upgrade():
|
||||
|
||||
op.create_table(
|
||||
'apscheduler_jobs',
|
||||
sa.Column('id', sa.Unicode(191, _warn_on_bytestring=False),
|
||||
sa.Column('id', sa.Unicode(191),
|
||||
nullable=False),
|
||||
sa.Column('next_run_time', sa.Float(25), index=True),
|
||||
sa.Column('job_state', sa.LargeBinary, nullable=False),
|
||||
|
@ -244,7 +244,8 @@ class Connection(api.BaseConnection):
|
||||
for relationship in relationships:
|
||||
if not relationship.uselist:
|
||||
# We have a One-to-X relationship
|
||||
query = query.options(joinedload(relationship.key))
|
||||
query = query.options(joinedload(
|
||||
getattr(model, relationship.key)))
|
||||
return query
|
||||
|
||||
def _create(self, model, values):
|
||||
|
@ -22,6 +22,7 @@ from apscheduler.jobstores.base import ConflictingIdError
|
||||
from apscheduler.jobstores import sqlalchemy
|
||||
from apscheduler.util import datetime_to_utc_timestamp
|
||||
from apscheduler.util import maybe_ref
|
||||
from apscheduler.util import utc_timestamp_to_datetime
|
||||
|
||||
from watcher.common import context
|
||||
from watcher.common import service
|
||||
@ -32,7 +33,7 @@ try:
|
||||
except ImportError: # pragma: nocover
|
||||
import pickle
|
||||
|
||||
from sqlalchemy import Table, MetaData, select, and_
|
||||
from sqlalchemy import Table, MetaData, select, and_, null
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
|
||||
@ -58,8 +59,7 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore):
|
||||
super(WatcherJobStore, self).__init__(url, engine, tablename,
|
||||
metadata, pickle_protocol)
|
||||
metadata = maybe_ref(metadata) or MetaData()
|
||||
self.jobs_t = Table(tablename, metadata, autoload=True,
|
||||
autoload_with=engine)
|
||||
self.jobs_t = Table(tablename, metadata, autoload_with=engine)
|
||||
service_ident = service.ServiceHeartbeat.get_service_name()
|
||||
self.tag = tag or {'host': service_ident[0], 'name': service_ident[1]}
|
||||
self.service_id = objects.Service.list(context=context.make_context(),
|
||||
@ -79,7 +79,8 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore):
|
||||
'tag': jsonutils.dumps(self.tag)
|
||||
})
|
||||
try:
|
||||
self.engine.execute(insert)
|
||||
with self.engine.begin() as conn:
|
||||
conn.execute(insert)
|
||||
except IntegrityError:
|
||||
raise ConflictingIdError(job.id)
|
||||
|
||||
@ -88,20 +89,36 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore):
|
||||
self._fix_paused_jobs_sorting(jobs)
|
||||
return jobs
|
||||
|
||||
def get_next_run_time(self):
|
||||
selectable = select(self.jobs_t.c.next_run_time).\
|
||||
where(self.jobs_t.c.next_run_time != null()).\
|
||||
order_by(self.jobs_t.c.next_run_time).limit(1)
|
||||
with self.engine.begin() as connection:
|
||||
# NOTE(danms): The apscheduler implementation of this gets a
|
||||
# decimal.Decimal back from scalar() which causes
|
||||
# utc_timestamp_to_datetime() to choke since it is expecting a
|
||||
# python float. Assume this is SQLAlchemy 2.0 stuff, so just
|
||||
# coerce to a float here.
|
||||
next_run_time = connection.execute(selectable).scalar()
|
||||
return utc_timestamp_to_datetime(float(next_run_time)
|
||||
if next_run_time is not None
|
||||
else None)
|
||||
|
||||
def _get_jobs(self, *conditions):
|
||||
jobs = []
|
||||
conditions += (self.jobs_t.c.service_id == self.service_id,)
|
||||
selectable = select(
|
||||
[self.jobs_t.c.id, self.jobs_t.c.job_state, self.jobs_t.c.tag]
|
||||
self.jobs_t.c.id, self.jobs_t.c.job_state, self.jobs_t.c.tag
|
||||
).order_by(self.jobs_t.c.next_run_time).where(and_(*conditions))
|
||||
failed_job_ids = set()
|
||||
for row in self.engine.execute(selectable):
|
||||
try:
|
||||
jobs.append(self._reconstitute_job(row.job_state))
|
||||
except Exception:
|
||||
self._logger.exception(
|
||||
'Unable to restore job "%s" -- removing it', row.id)
|
||||
failed_job_ids.add(row.id)
|
||||
with self.engine.begin() as conn:
|
||||
for row in conn.execute(selectable):
|
||||
try:
|
||||
jobs.append(self._reconstitute_job(row.job_state))
|
||||
except Exception:
|
||||
self._logger.exception(
|
||||
'Unable to restore job "%s" -- removing it', row.id)
|
||||
failed_job_ids.add(row.id)
|
||||
|
||||
# Remove all the jobs we failed to restore
|
||||
if failed_job_ids:
|
||||
|
Loading…
Reference in New Issue
Block a user