diff --git a/stackalytics/processor/main.py b/stackalytics/processor/main.py index 69b39a61f..346a881ce 100644 --- a/stackalytics/processor/main.py +++ b/stackalytics/processor/main.py @@ -38,22 +38,20 @@ LOG = logging.getLogger(__name__) def get_pids(): - uwsgi_dict = {} + # needs to be compatible with psutil >= 1.1.1 since it's a global req. + PSUTIL2 = psutil.version_info >= (2, 0) + result = set([]) for pid in psutil.get_pid_list(): try: p = psutil.Process(pid) - if p.cmdline and p.cmdline[0].find('/uwsgi'): - if p.parent: - uwsgi_dict[p.pid] = p.parent.pid + name = p.name() if PSUTIL2 else p.name + if name == 'uwsgi': + LOG.debug('Found uwsgi process, pid: %s', pid) + result.add(pid) except Exception as e: LOG.debug('Exception while iterating process list: %s', e) pass - result = set() - for pid in uwsgi_dict: - if uwsgi_dict[pid] in uwsgi_dict: - result.add(pid) - return result diff --git a/stackalytics/processor/runtime_storage.py b/stackalytics/processor/runtime_storage.py index 9b0455961..f10c25547 100644 --- a/stackalytics/processor/runtime_storage.py +++ b/stackalytics/processor/runtime_storage.py @@ -162,6 +162,7 @@ class MemcachedStorage(RuntimeStorage): stored_pids = self.memcached.get('pids') or set() for pid in stored_pids: if pid not in pids: + LOG.debug('Purge dead uwsgi pid %s from pids list', pid) self.memcached.delete('pid:%s' % pid) self.memcached.set('pids', pids) @@ -175,11 +176,15 @@ class MemcachedStorage(RuntimeStorage): min_update = n first_valid_update = self.memcached.get('first_valid_update') or 0 + LOG.debug('Purge polled updates from %(first)s to %(min)s', + {'first': first_valid_update, 'min': min_update}) + for delete_id_set in utils.make_range(first_valid_update, min_update, BULK_DELETE_SIZE): if not self.memcached.delete_multi(delete_id_set, key_prefix=UPDATE_ID_PREFIX): raise Exception('Failed to delete from memcache') + self.memcached.set('first_valid_update', min_update) def _get_update_count(self):