Fixes for stacktach verifier processes
Fix memory usage for verifiers. Events to verify were being loaded from the db into an in-memory fifo queue to spool to worker processes. This was not being limited, resulting in a large amount of memory being used if events were read from the DB faster than they were being processed. This change pauses the loading of events if the in-memory queue grows larger than specified batchsize. Also, verifier child processes were not handling signals (like SIGTERM) properly, resulting in them not shutting down properly. Added proper signal handling. Change-Id: Ife25ca07398acf111f4388071b5f2e4eafeecb05
This commit is contained in:
parent
0c8ee8fc40
commit
f5a03f1afe
@ -563,11 +563,13 @@ class ImageExists(models.Model):
|
||||
self.status = new_status
|
||||
|
||||
@staticmethod
|
||||
def find_and_group_by_owner_and_raw_id(ending_max, status):
|
||||
def find_and_group_by_owner_and_raw_id(ending_max, status, batchsize=None):
|
||||
params = {'audit_period_ending__lte': dt.dt_to_decimal(ending_max),
|
||||
'status': status}
|
||||
ordered_exists = ImageExists.objects.select_related().\
|
||||
filter(**params).order_by('owner')
|
||||
if batchsize:
|
||||
ordered_exists = ordered_exists[:batchsize]
|
||||
result = {}
|
||||
for exist in ordered_exists:
|
||||
key = "%s-%s" % (exist.owner, exist.raw_id)
|
||||
|
@ -526,9 +526,11 @@ class GlanceVerifierTestCase(StacktachBaseTestCase):
|
||||
results = {'owner1': [exist1, exist2], 'owner2': [exist3]}
|
||||
sent_results = {'owner1': [exist4], 'owner2': [exist5]}
|
||||
models.ImageExists.find_and_group_by_owner_and_raw_id(
|
||||
batchsize=1000,
|
||||
ending_max=when_max,
|
||||
status=models.ImageExists.SENT_UNVERIFIED).AndReturn(sent_results)
|
||||
models.ImageExists.find_and_group_by_owner_and_raw_id(
|
||||
batchsize=1000,
|
||||
ending_max=when_max,
|
||||
status=models.ImageExists.PENDING).AndReturn(results)
|
||||
exist1.save()
|
||||
@ -569,9 +571,11 @@ class GlanceVerifierTestCase(StacktachBaseTestCase):
|
||||
exist3 = self.mox.CreateMockAnything()
|
||||
results = {'owner1': [exist1, exist2], 'owner2': [exist3]}
|
||||
models.ImageExists.find_and_group_by_owner_and_raw_id(
|
||||
batchsize=1000,
|
||||
ending_max=when_max,
|
||||
status=models.ImageExists.SENT_UNVERIFIED).AndReturn([])
|
||||
status=models.ImageExists.SENT_UNVERIFIED).AndReturn({})
|
||||
models.ImageExists.find_and_group_by_owner_and_raw_id(
|
||||
batchsize=1000,
|
||||
ending_max=when_max,
|
||||
status=models.ImageExists.PENDING).AndReturn(results)
|
||||
exist1.save()
|
||||
|
@ -860,11 +860,13 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase):
|
||||
ending_max=when_max, status='sent_unverified').AndReturn(sent_results)
|
||||
models.InstanceExists.find(
|
||||
ending_max=when_max, status='pending').AndReturn(results)
|
||||
sent_results.__getslice__(0, 1000).AndReturn(sent_results)
|
||||
results.__getslice__(0, 1000).AndReturn(results)
|
||||
sent_results.count().AndReturn(0)
|
||||
results.count().AndReturn(2)
|
||||
exist1 = self.mox.CreateMockAnything()
|
||||
exist2 = self.mox.CreateMockAnything()
|
||||
results.__getslice__(0, 1000).AndReturn(results)
|
||||
sent_results.__iter__().AndReturn([].__iter__())
|
||||
results.__iter__().AndReturn([exist1, exist2].__iter__())
|
||||
exist1.update_status('verifying')
|
||||
exist2.update_status('verifying')
|
||||
@ -896,11 +898,13 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase):
|
||||
ending_max=when_max, status='sent_unverified').AndReturn(sent_results)
|
||||
models.InstanceExists.find(
|
||||
ending_max=when_max, status='pending').AndReturn(results)
|
||||
sent_results.__getslice__(0, 1000).AndReturn(sent_results)
|
||||
results.__getslice__(0, 1000).AndReturn(results)
|
||||
sent_results.count().AndReturn(0)
|
||||
results.count().AndReturn(2)
|
||||
exist1 = self.mox.CreateMockAnything()
|
||||
exist2 = self.mox.CreateMockAnything()
|
||||
results.__getslice__(0, 1000).AndReturn(results)
|
||||
sent_results.__iter__().AndReturn([].__iter__())
|
||||
results.__iter__().AndReturn([exist1, exist2].__iter__())
|
||||
exist1.update_status('verifying')
|
||||
exist2.update_status('verifying')
|
||||
@ -928,12 +932,14 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase):
|
||||
ending_max=when_max, status='sent_unverified').AndReturn(sent_results)
|
||||
models.InstanceExists.find(
|
||||
ending_max=when_max, status='pending').AndReturn(results)
|
||||
sent_results.__getslice__(0, 1000).AndReturn(sent_results)
|
||||
results.__getslice__(0, 1000).AndReturn(results)
|
||||
sent_results.count().AndReturn(2)
|
||||
results.count().AndReturn(0)
|
||||
exist1 = self.mox.CreateMockAnything()
|
||||
exist2 = self.mox.CreateMockAnything()
|
||||
sent_results.__getslice__(0, 1000).AndReturn(sent_results)
|
||||
sent_results.__iter__().AndReturn([exist1, exist2].__iter__())
|
||||
results.__iter__().AndReturn([].__iter__())
|
||||
exist1.update_status('sent_verifying')
|
||||
exist2.update_status('sent_verifying')
|
||||
exist1.save()
|
||||
|
@ -193,7 +193,7 @@ class FakeVerifierConfig(object):
|
||||
self.nova_event_type = lambda: nova_event_type
|
||||
self.glance_event_type = lambda: glance_event_type
|
||||
self.flavor_field_name = lambda: flavor_field_name
|
||||
|
||||
self.batchsize = lambda: 1000
|
||||
|
||||
def make_verifier_config(notifs):
|
||||
topics = {'exchange': ['notifications.info']}
|
||||
|
@ -18,6 +18,7 @@ import datetime
|
||||
import decimal
|
||||
import os
|
||||
import re
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
import multiprocessing
|
||||
@ -108,6 +109,7 @@ def _is_alphanumeric(attr_name, attr_value, exist_id, instance_uuid):
|
||||
|
||||
|
||||
class Verifier(object):
|
||||
|
||||
def __init__(self, config, pool=None, reconciler=None, stats=None):
|
||||
self.config = config
|
||||
self.pool = pool or multiprocessing.Pool(config.pool_size())
|
||||
@ -115,10 +117,14 @@ class Verifier(object):
|
||||
self.reconciler = reconciler
|
||||
self.results = []
|
||||
self.failed = []
|
||||
self.batchsize = config.batchsize()
|
||||
if stats is None:
|
||||
self.stats = {}
|
||||
else:
|
||||
self.stats = stats
|
||||
self.update_interval = datetime.timedelta(seconds=30)
|
||||
self.next_update = datetime.datetime.utcnow() + self.update_interval
|
||||
self._do_run = True
|
||||
|
||||
def clean_results(self):
|
||||
pending = []
|
||||
@ -140,8 +146,39 @@ class Verifier(object):
|
||||
errored = finished - successful
|
||||
return len(self.results), successful, errored
|
||||
|
||||
def check_results(self, new_added, force=False):
|
||||
tick_time = self.config.tick_time()
|
||||
if ((datetime.datetime.utcnow() > self.next_update)
|
||||
or force or (len(self.results) > self.batchsize)):
|
||||
values = ((self.exchange(), new_added,) + self.clean_results())
|
||||
msg = "%s: N: %s, P: %s, S: %s, E: %s" % values
|
||||
_get_child_logger().info(msg)
|
||||
while len(self.results) > (self.batchsize * 0.75):
|
||||
msg = "%s: Waiting on event processing. Pending: %s" % (
|
||||
self.exchange(), len(self.results))
|
||||
_get_child_logger().info(msg)
|
||||
time.sleep(tick_time)
|
||||
self.clean_results()
|
||||
self.next_update = datetime.datetime.utcnow() + self.update_interval
|
||||
|
||||
def handle_signal(self, signal_number):
|
||||
log = _get_child_logger()
|
||||
if signal_number in (signal.SIGTERM, signal.SIGKILL):
|
||||
self._do_run = False
|
||||
log.info("%s verifier cleaning up for shutdown." % self.exchange())
|
||||
if signal_number == signal.SIGUSR1:
|
||||
info = """
|
||||
%s verifier:
|
||||
PID: %s Parent PID:
|
||||
Last watchdog check: %s
|
||||
# of items processed: %s
|
||||
""" % (self.exchange(), os.getpid(), os.getppid(),
|
||||
self.stats['timestamp'],
|
||||
self.stats.get('total_processed',0))
|
||||
log.info(info)
|
||||
|
||||
def _keep_running(self):
|
||||
return True
|
||||
return self._do_run
|
||||
|
||||
def _utcnow(self):
|
||||
return datetime.datetime.utcnow()
|
||||
@ -157,11 +194,9 @@ class Verifier(object):
|
||||
kwargs = {settle_units: settle_time}
|
||||
ending_max = now - datetime.timedelta(**kwargs)
|
||||
new = self.verify_for_range(ending_max, callback=callback)
|
||||
values = ((self.exchange(), new,) + self.clean_results())
|
||||
self.check_results(new, force=True)
|
||||
if self.reconciler:
|
||||
self.reconcile_failed()
|
||||
msg = "%s: N: %s, P: %s, S: %s, E: %s" % values
|
||||
_get_child_logger().info(msg)
|
||||
time.sleep(tick_time)
|
||||
|
||||
def run(self):
|
||||
|
@ -105,6 +105,8 @@ def nova_event_type():
|
||||
def glance_event_type():
|
||||
return config.get('glance_event_type', 'image.exists.verified')
|
||||
|
||||
def batchsize():
|
||||
return config.get('batchsize', 1000)
|
||||
|
||||
def flavor_field_name():
|
||||
return config['flavor_field_name']
|
||||
|
@ -158,42 +158,35 @@ def _verify(exists):
|
||||
|
||||
|
||||
class GlanceVerifier(Verifier):
|
||||
def __init__(self, config, pool=None, stats=None):
|
||||
super(GlanceVerifier, self).__init__(config, pool=pool, stats=stats)
|
||||
|
||||
def verify_exists(self, grouped_exists, callback, verifying_status):
|
||||
count = len(grouped_exists)
|
||||
added = 0
|
||||
update_interval = datetime.timedelta(seconds=30)
|
||||
next_update = datetime.datetime.utcnow() + update_interval
|
||||
_get_child_logger().info("glance: Adding %s per-owner exists to queue." % count)
|
||||
while added < count:
|
||||
for exists in grouped_exists.values():
|
||||
for exist in exists:
|
||||
exist.status = verifying_status
|
||||
exist.save()
|
||||
result = self.pool.apply_async(_verify, args=(exists,),
|
||||
callback=callback)
|
||||
self.results.append(result)
|
||||
added += 1
|
||||
if datetime.datetime.utcnow() > next_update:
|
||||
values = ((added,) + self.clean_results())
|
||||
msg = "glance: N: %s, P: %s, S: %s, E: %s" % values
|
||||
_get_child_logger().info(msg)
|
||||
next_update = datetime.datetime.utcnow() + update_interval
|
||||
for exists in grouped_exists.values():
|
||||
for exist in exists:
|
||||
exist.status = verifying_status
|
||||
exist.save()
|
||||
result = self.pool.apply_async(_verify, args=(exists,),
|
||||
callback=callback)
|
||||
self.results.append(result)
|
||||
added += 1
|
||||
self.check_results(added)
|
||||
return count
|
||||
|
||||
def verify_for_range(self, ending_max, callback=None):
|
||||
unsent_exists_grouped_by_owner_and_rawid = \
|
||||
models.ImageExists.find_and_group_by_owner_and_raw_id(
|
||||
ending_max=ending_max,
|
||||
status=models.ImageExists.SENT_UNVERIFIED)
|
||||
status=models.ImageExists.SENT_UNVERIFIED,
|
||||
batchsize=self.batchsize)
|
||||
unsent_count = self.verify_exists(unsent_exists_grouped_by_owner_and_rawid,
|
||||
None, models.ImageExists.SENT_VERIFYING)
|
||||
exists_grouped_by_owner_and_rawid = \
|
||||
models.ImageExists.find_and_group_by_owner_and_raw_id(
|
||||
ending_max=ending_max,
|
||||
status=models.ImageExists.PENDING)
|
||||
status=models.ImageExists.PENDING,
|
||||
batchsize=self.batchsize)
|
||||
count = self.verify_exists(exists_grouped_by_owner_and_rawid, callback,
|
||||
models.ImageExists.VERIFYING)
|
||||
|
||||
|
@ -322,36 +322,31 @@ class NovaVerifier(base_verifier.Verifier):
|
||||
def verify_exists(self, callback, exists, verifying_status):
|
||||
count = exists.count()
|
||||
added = 0
|
||||
update_interval = datetime.timedelta(seconds=30)
|
||||
next_update = datetime.datetime.utcnow() + update_interval
|
||||
_get_child_logger().info("nova: Adding %s exists to queue." % count)
|
||||
while added < count:
|
||||
for exist in exists[0:1000]:
|
||||
exist.update_status(verifying_status)
|
||||
exist.save()
|
||||
validation_level = self.config.validation_level()
|
||||
result = self.pool.apply_async(
|
||||
_verify, args=(exist, validation_level),
|
||||
callback=callback)
|
||||
self.results.append(result)
|
||||
added += 1
|
||||
if datetime.datetime.utcnow() > next_update:
|
||||
values = ((added,) + self.clean_results())
|
||||
msg = "nova: N: %s, P: %s, S: %s, E: %s" % values
|
||||
_get_child_logger().info(msg)
|
||||
next_update = datetime.datetime.utcnow() + update_interval
|
||||
for exist in exists:
|
||||
exist.update_status(verifying_status)
|
||||
exist.save()
|
||||
validation_level = self.config.validation_level()
|
||||
result = self.pool.apply_async(
|
||||
_verify, args=(exist, validation_level),
|
||||
callback=callback)
|
||||
self.results.append(result)
|
||||
added += 1
|
||||
self.check_results(added)
|
||||
return count
|
||||
|
||||
def verify_for_range(self, ending_max, callback=None):
|
||||
sent_unverified_exists = models.InstanceExists.find(
|
||||
ending_max=ending_max, status=
|
||||
models.InstanceExists.SENT_UNVERIFIED)
|
||||
sent_unverified_exists = sent_unverified_exists[:self.batchsize]
|
||||
sent_unverified_count = self.verify_exists(None,
|
||||
sent_unverified_exists,
|
||||
models.InstanceExists.
|
||||
SENT_VERIFYING)
|
||||
exists = models.InstanceExists.find(
|
||||
ending_max=ending_max, status=models.InstanceExists.PENDING)
|
||||
exists = exists[:self.batchsize]
|
||||
count = self.verify_exists(callback, exists,
|
||||
models.InstanceExists.VERIFYING)
|
||||
return count+sent_unverified_count
|
||||
|
@ -74,6 +74,13 @@ def make_and_start_verifier(exchange, stats=None):
|
||||
verifier = glance_verifier.GlanceVerifier(verifier_config,
|
||||
stats=stats)
|
||||
|
||||
def sig_handler(signal_number, frame):
|
||||
verifier.handle_signal(signal_number)
|
||||
|
||||
signal.signal(signal.SIGINT, sig_handler)
|
||||
signal.signal(signal.SIGTERM, sig_handler)
|
||||
signal.signal(signal.SIGUSR1, sig_handler)
|
||||
|
||||
verifier.run()
|
||||
|
||||
|
||||
@ -148,10 +155,21 @@ def stop_all():
|
||||
processes[pname]['pid'] = 0
|
||||
|
||||
|
||||
def kill_time(signal, frame):
|
||||
def signal_all(signal_number):
|
||||
procs = sorted(processes.keys())
|
||||
for pname in procs:
|
||||
if is_alive(processes[pname]):
|
||||
pid = processes[pname]['pid']
|
||||
os.kill(pid, signal_number)
|
||||
|
||||
|
||||
def kill_time(signal_number, frame):
|
||||
global RUNNING
|
||||
RUNNING = False
|
||||
stop_all()
|
||||
if signal_number in (signal.SIGTERM, signal.SIGKILL):
|
||||
RUNNING = False
|
||||
stop_all()
|
||||
if signal_number == signal.SIGUSR1:
|
||||
signal_all(signal.SIGUSR1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
@ -166,6 +184,7 @@ if __name__ == '__main__':
|
||||
|
||||
signal.signal(signal.SIGINT, kill_time)
|
||||
signal.signal(signal.SIGTERM, kill_time)
|
||||
signal.signal(signal.SIGUSR1, kill_time)
|
||||
|
||||
logger.info("Starting Verifiers...")
|
||||
while RUNNING:
|
||||
|
Loading…
x
Reference in New Issue
Block a user