Handle log filter exceptions more gracefully.
If there is an exception filtering a log event handle that by removing the filter and continuing to process the remaining log events for the assocaited file. This prevents non filter data from being lost when the filters have an exception. Change-Id: I65141daf21a873096829c41fdc2c77cbeecde2e3
This commit is contained in:
parent
ad7485f8e9
commit
cb6e4669f0
@ -52,6 +52,10 @@ def semi_busy_wait(seconds):
|
|||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
|
class FilterException(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class CRM114Filter(object):
|
class CRM114Filter(object):
|
||||||
def __init__(self, script, path, build_status):
|
def __init__(self, script, path, build_status):
|
||||||
self.p = None
|
self.p = None
|
||||||
@ -77,14 +81,14 @@ class CRM114Filter(object):
|
|||||||
[self.p.stdin, self.p.stdout], 20)
|
[self.p.stdin, self.p.stdout], 20)
|
||||||
if not r:
|
if not r:
|
||||||
self.p.kill()
|
self.p.kill()
|
||||||
raise Exception('Timeout reading from CRM114')
|
raise FilterException('Timeout reading from CRM114')
|
||||||
r = self.p.stdout.readline()
|
r = self.p.stdout.readline()
|
||||||
if not r:
|
if not r:
|
||||||
err = self.p.stderr.read()
|
err = self.p.stderr.read()
|
||||||
if err:
|
if err:
|
||||||
raise Exception(err)
|
raise FilterException(err)
|
||||||
else:
|
else:
|
||||||
raise Exception('Early EOF from CRM114')
|
raise FilterException('Early EOF from CRM114')
|
||||||
r = r.strip()
|
r = r.strip()
|
||||||
data['error_pr'] = float(r)
|
data['error_pr'] = float(r)
|
||||||
|
|
||||||
@ -143,6 +147,7 @@ class LogRetriever(threading.Thread):
|
|||||||
for f in self.filters:
|
for f in self.filters:
|
||||||
logging.debug("Adding filter: %s" % f.name)
|
logging.debug("Adding filter: %s" % f.name)
|
||||||
filters.append(f.create(fields))
|
filters.append(f.create(fields))
|
||||||
|
all_filters = filters
|
||||||
|
|
||||||
logging.debug("Pushing " + str(len(log_lines)) + " log lines.")
|
logging.debug("Pushing " + str(len(log_lines)) + " log lines.")
|
||||||
base_event = {}
|
base_event = {}
|
||||||
@ -151,10 +156,17 @@ class LogRetriever(threading.Thread):
|
|||||||
for line in log_lines:
|
for line in log_lines:
|
||||||
out_event = base_event.copy()
|
out_event = base_event.copy()
|
||||||
out_event["message"] = line
|
out_event["message"] = line
|
||||||
|
new_filters = []
|
||||||
for f in filters:
|
for f in filters:
|
||||||
f.process(out_event)
|
try:
|
||||||
|
f.process(out_event)
|
||||||
|
new_filters.append(f)
|
||||||
|
except FilterException:
|
||||||
|
logging.exception("Exception filtering event: "
|
||||||
|
"%s" % line.encode("utf-8"))
|
||||||
|
filters = new_filters
|
||||||
self.logq.put(out_event)
|
self.logq.put(out_event)
|
||||||
for f in filters:
|
for f in all_filters:
|
||||||
f.close()
|
f.close()
|
||||||
job.sendWorkComplete()
|
job.sendWorkComplete()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user