Merge "Serialize update of changes in timer driver"
This commit is contained in:
@@ -17,6 +17,7 @@
|
|||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
from collections import defaultdict
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
from apscheduler.schedulers.background import BackgroundScheduler
|
from apscheduler.schedulers.background import BackgroundScheduler
|
||||||
@@ -42,6 +43,10 @@ class TimerDriver(Driver, TriggerInterface):
|
|||||||
self.election = None
|
self.election = None
|
||||||
self.election_thread = None
|
self.election_thread = None
|
||||||
self.election_won = False
|
self.election_won = False
|
||||||
|
# Mapping of locks: canonical project name -> lock
|
||||||
|
# The lock are used to avoid concurrent update errors when a
|
||||||
|
# lot of periodic pipelines are triggering simultanously.
|
||||||
|
self.project_update_locks = defaultdict(threading.Lock)
|
||||||
self.stop_event = threading.Event()
|
self.stop_event = threading.Event()
|
||||||
self.stopped = False
|
self.stopped = False
|
||||||
|
|
||||||
@@ -175,7 +180,8 @@ class TimerDriver(Driver, TriggerInterface):
|
|||||||
event.timestamp = time.time()
|
event.timestamp = time.time()
|
||||||
# Refresh the branch in order to update the item in the
|
# Refresh the branch in order to update the item in the
|
||||||
# change cache.
|
# change cache.
|
||||||
project.source.getChange(event, refresh=True)
|
with self.project_update_locks[project.canonical_name]:
|
||||||
|
project.source.getChange(event, refresh=True)
|
||||||
log = get_annotated_logger(self.log, event)
|
log = get_annotated_logger(self.log, event)
|
||||||
log.debug("Adding event")
|
log.debug("Adding event")
|
||||||
self.sched.addTriggerEvent(self.name, event)
|
self.sched.addTriggerEvent(self.name, event)
|
||||||
|
|||||||
Reference in New Issue
Block a user