Merge "Fix a concurrency issue when locking reprocessing tasks"
This commit is contained in:
@@ -614,13 +614,17 @@ class CloudKittyProcessor(cotyledon.Service):
|
|||||||
lock_name, lock = get_lock(
|
lock_name, lock = get_lock(
|
||||||
self.coord, self.generate_lock_base_name(tenant_id))
|
self.coord, self.generate_lock_base_name(tenant_id))
|
||||||
|
|
||||||
LOG.debug('[Worker: {w}] Trying to acquire lock "{lock_name}".'
|
LOG.debug('[Worker: {w}] Trying to acquire lock "{lock_name}" for '
|
||||||
.format(w=self._worker_id, lock_name=lock_name))
|
'scope ID {scope_id}.'.format(w=self._worker_id,
|
||||||
|
lock_name=lock_name,
|
||||||
|
scope_id=tenant_id))
|
||||||
|
|
||||||
lock_acquired = lock.acquire(blocking=False)
|
lock_acquired = lock.acquire(blocking=False)
|
||||||
if lock_acquired:
|
if lock_acquired:
|
||||||
LOG.debug('[Worker: {w}] Acquired lock "{lock_name}".'.format(
|
LOG.debug('[Worker: {w}] Acquired lock "{lock_name}" for '
|
||||||
w=self._worker_id, lock_name=lock_name))
|
'scope ID {scope_id}.'.format(w=self._worker_id,
|
||||||
|
lock_name=lock_name,
|
||||||
|
scope_id=tenant_id))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.process_scope(tenant_id)
|
self.process_scope(tenant_id)
|
||||||
@@ -702,9 +706,10 @@ class CloudKittyReprocessor(CloudKittyProcessor):
|
|||||||
self._worker_id, len(self.tenants))
|
self._worker_id, len(self.tenants))
|
||||||
|
|
||||||
def generate_lock_base_name(self, scope):
|
def generate_lock_base_name(self, scope):
|
||||||
return "%s-id=%s-start=%s-end=%s-current=%s" % (
|
return "%s-id=%s-start=%s-end=%s" % (self.worker_class,
|
||||||
self.worker_class, scope.identifier, scope.start_reprocess_time,
|
scope.identifier,
|
||||||
scope.end_reprocess_time, scope.current_reprocess_time)
|
scope.start_reprocess_time,
|
||||||
|
scope.end_reprocess_time)
|
||||||
|
|
||||||
|
|
||||||
class CloudKittyServiceManager(cotyledon.ServiceManager):
|
class CloudKittyServiceManager(cotyledon.ServiceManager):
|
||||||
|
|||||||
@@ -623,10 +623,9 @@ class CloudKittyReprocessorTest(tests.TestCase):
|
|||||||
|
|
||||||
expected_lock_name = "<class 'cloudkitty.orchestrator." \
|
expected_lock_name = "<class 'cloudkitty.orchestrator." \
|
||||||
"ReprocessingWorker'>-id=scope_identifier-" \
|
"ReprocessingWorker'>-id=scope_identifier-" \
|
||||||
"start=%s-end=%s-current=%s" % (
|
"start=%s-end=%s" % (
|
||||||
scope_mock.start_reprocess_time,
|
scope_mock.start_reprocess_time,
|
||||||
scope_mock.end_reprocess_time,
|
scope_mock.end_reprocess_time)
|
||||||
scope_mock.current_reprocess_time)
|
|
||||||
|
|
||||||
self.assertEqual(expected_lock_name, return_generate_lock_name)
|
self.assertEqual(expected_lock_name, return_generate_lock_name)
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,4 @@
|
|||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
Fixed concurrency issues during reprocessing tasks.
|
||||||
Reference in New Issue
Block a user