Filter out details from taskflow logs with v2+jobboard

When enabling INFO-level logs in taskflow and using jobboard with
amphorav2, taskflow prints the string representation of a job when it is
completed. It includes the parameters of the flow, which might include
private information from TLS-enabled listeners and pools such as
certificates, private_key and intermediate certificates.

This commit filters out the private information from the logs by using
logging.Filter, it replaces private attributes with '***'.

Story 2010523
Task 47125

Change-Id: I2df8a49851feb1445b5128ce99b880ddb77782ad
This commit is contained in:
Gregory Thiemonge 2023-01-22 02:41:26 -05:00
parent eea6ac62fd
commit 6c731fa2fd
3 changed files with 116 additions and 0 deletions

View File

@ -24,6 +24,7 @@ from oslo_utils import uuidutils
from taskflow.conductors.backends import impl_blocking
from taskflow import engines
from taskflow import exceptions as taskflow_exc
from taskflow.jobs.base import Job
from taskflow.listeners import base
from taskflow.listeners import logging
from taskflow.persistence import models
@ -51,6 +52,44 @@ def retryMaskFilter(record):
LOG.logger.addFilter(retryMaskFilter)
def _details_filter(obj):
if isinstance(obj, dict):
ret = {}
for key in obj:
if (key in ('certificate', 'private_key', 'passphrase') and
isinstance(obj[key], str)):
ret[key] = '***'
elif key == 'intermediates' and isinstance(obj[key], list):
ret[key] = ['***'] * len(obj[key])
else:
ret[key] = _details_filter(obj[key])
return ret
if isinstance(obj, list):
return [_details_filter(e) for e in obj]
return obj
class FilteredJob(Job):
def __str__(self):
# Override the detault __str__ method from taskflow.job.base.Job,
# filter out private information from details
cls_name = type(self).__name__
details = _details_filter(self.details)
return "%s: %s (priority=%s, uuid=%s, details=%s)" % (
cls_name, self.name, self.priority,
self.uuid, details)
class JobDetailsFilter(log.logging.Filter):
def filter(self, record):
# If the first arg is a Job, convert it now to a string with our custom
# method
if isinstance(record.args[0], Job):
arg0 = record.args[0]
record.args = (FilteredJob.__str__(arg0),) + record.args[1:]
return True
class BaseTaskFlowEngine(object):
"""This is the task flow engine
@ -129,6 +168,11 @@ class TaskFlowServiceController(object):
def __init__(self, driver):
self.driver = driver
# Install filter for taskflow executor logger
taskflow_logger = log.logging.getLogger(
"taskflow.conductors.backends.impl_executor")
taskflow_logger.addFilter(JobDetailsFilter())
def run_poster(self, flow_factory, *args, **kwargs):
with self.driver.persistence_driver.get_persistence() as persistence:
with self.driver.job_board(persistence) as job_board:

View File

@ -18,6 +18,7 @@ from unittest import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from taskflow import engines as tf_engines
from taskflow.jobs.base import Job
from octavia.common import base_taskflow
import octavia.tests.unit.base as base
@ -173,3 +174,62 @@ class TestTaskFlowServiceController(base.TestCase):
job1.extend_expiry.assert_called_once_with(30)
job2.extend_expiry.assert_not_called()
job3.extend_expiry.assert_not_called()
class TestJobDetailsFilter(base.TestCase):
def test_filter(self):
log_filter = base_taskflow.JobDetailsFilter()
tls_container_data = {
'certificate': '<CERTIFICATE>',
'private_key': '<PRIVATE_KEY>',
'passphrase': '<PASSPHRASE>',
'intermediates': [
'<INTERMEDIATE1>',
'<INTERMEDIATE2>'
]
}
job = mock.Mock(spec=Job)
job.details = {
'store': {
'listeners': [
{
'name': 'listener_name',
'default_tls_container_data': tls_container_data
}
],
'any_recursive': {
'type': [
{
'other_list': [
tls_container_data,
{
'test': tls_container_data,
}
]
}
]
}
}
}
record = mock.Mock()
record.args = (job, 'something')
ret = log_filter.filter(record)
self.assertTrue(ret)
self.assertNotIn(tls_container_data['certificate'], record.args[0])
self.assertNotIn(tls_container_data['private_key'], record.args[0])
self.assertNotIn(tls_container_data['passphrase'], record.args[0])
self.assertNotIn(tls_container_data['intermediates'][0],
record.args[0])
self.assertNotIn(tls_container_data['intermediates'][1],
record.args[0])
self.assertIn('listener_name', record.args[0])
record.args = ('arg1', 2)
ret = log_filter.filter(record)
self.assertTrue(ret)

View File

@ -0,0 +1,12 @@
---
security:
- |
Filter out private information from the taskflow logs when ''INFO'' level
messages are enabled and when jobboard is enabled. Logs might have included
TLS certificates and private_key. By default, in Octavia only WARNING and
above messages are enabled in taskflow and jobboard is disabled.
fixes:
- |
The parameters of a taskflow Flow were logged in ''INFO'' level messages by
taskflow, it included TLS-enabled listeners and pools parameters, such as
certificates and private_key.