Filter out details from taskflow logs with v2+jobboard

When enabling INFO-level logs in taskflow and using jobboard with
amphorav2, taskflow prints the string representation of a job when it is
completed. It includes the parameters of the flow, which might include
private information from TLS-enabled listeners and pools such as
certificates, private_key and intermediate certificates.

This commit filters out the private information from the logs by using
logging.Filter, it replaces private attributes with '***'.

Story 2010523
Task 47125

Conflicts:
	octavia/tests/unit/common/test_base_taskflow.py

Change-Id: I2df8a49851feb1445b5128ce99b880ddb77782ad
(cherry picked from commit 6c731fa2fd)
(cherry picked from commit 4022aaf781)
(cherry picked from commit 46f774ad14)
(cherry picked from commit 6f19756278)
(cherry picked from commit deba44d022)
This commit is contained in:
Gregory Thiemonge 2023-01-22 02:41:26 -05:00
parent 1f617ff40a
commit f62cd60f1b
3 changed files with 116 additions and 0 deletions

View File

@ -23,6 +23,7 @@ from oslo_utils import uuidutils
from taskflow.conductors.backends import impl_blocking
from taskflow import engines
from taskflow import exceptions as taskflow_exc
from taskflow.jobs.base import Job
from taskflow.listeners import base
from taskflow.listeners import logging
from taskflow.persistence import models
@ -47,6 +48,44 @@ def retryMaskFilter(record):
LOG.logger.addFilter(retryMaskFilter)
def _details_filter(obj):
if isinstance(obj, dict):
ret = {}
for key in obj:
if (key in ('certificate', 'private_key', 'passphrase') and
isinstance(obj[key], str)):
ret[key] = '***'
elif key == 'intermediates' and isinstance(obj[key], list):
ret[key] = ['***'] * len(obj[key])
else:
ret[key] = _details_filter(obj[key])
return ret
if isinstance(obj, list):
return [_details_filter(e) for e in obj]
return obj
class FilteredJob(Job):
def __str__(self):
# Override the detault __str__ method from taskflow.job.base.Job,
# filter out private information from details
cls_name = type(self).__name__
details = _details_filter(self.details)
return "%s: %s (priority=%s, uuid=%s, details=%s)" % (
cls_name, self.name, self.priority,
self.uuid, details)
class JobDetailsFilter(log.logging.Filter):
def filter(self, record):
# If the first arg is a Job, convert it now to a string with our custom
# method
if isinstance(record.args[0], Job):
arg0 = record.args[0]
record.args = (FilteredJob.__str__(arg0),) + record.args[1:]
return True
class BaseTaskFlowEngine(object):
"""This is the task flow engine
@ -125,6 +164,11 @@ class TaskFlowServiceController(object):
def __init__(self, driver):
self.driver = driver
# Install filter for taskflow executor logger
taskflow_logger = log.logging.getLogger(
"taskflow.conductors.backends.impl_executor")
taskflow_logger.addFilter(JobDetailsFilter())
def run_poster(self, flow_factory, *args, **kwargs):
with self.driver.persistence_driver.get_persistence() as persistence:
with self.driver.job_board(persistence) as job_board:

View File

@ -18,6 +18,7 @@ from unittest import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from taskflow import engines as tf_engines
from taskflow.jobs.base import Job
from octavia.common import base_taskflow
import octavia.tests.unit.base as base
@ -154,3 +155,62 @@ class TestTaskFlowServiceController(base.TestCase):
"test2", self.jobboard_mock.__enter__(),
persistence=self.persistence_mock.__enter__(),
engine='parallel')
class TestJobDetailsFilter(base.TestCase):
def test_filter(self):
log_filter = base_taskflow.JobDetailsFilter()
tls_container_data = {
'certificate': '<CERTIFICATE>',
'private_key': '<PRIVATE_KEY>',
'passphrase': '<PASSPHRASE>',
'intermediates': [
'<INTERMEDIATE1>',
'<INTERMEDIATE2>'
]
}
job = mock.Mock(spec=Job)
job.details = {
'store': {
'listeners': [
{
'name': 'listener_name',
'default_tls_container_data': tls_container_data
}
],
'any_recursive': {
'type': [
{
'other_list': [
tls_container_data,
{
'test': tls_container_data,
}
]
}
]
}
}
}
record = mock.Mock()
record.args = (job, 'something')
ret = log_filter.filter(record)
self.assertTrue(ret)
self.assertNotIn(tls_container_data['certificate'], record.args[0])
self.assertNotIn(tls_container_data['private_key'], record.args[0])
self.assertNotIn(tls_container_data['passphrase'], record.args[0])
self.assertNotIn(tls_container_data['intermediates'][0],
record.args[0])
self.assertNotIn(tls_container_data['intermediates'][1],
record.args[0])
self.assertIn('listener_name', record.args[0])
record.args = ('arg1', 2)
ret = log_filter.filter(record)
self.assertTrue(ret)

View File

@ -0,0 +1,12 @@
---
security:
- |
Filter out private information from the taskflow logs when ''INFO'' level
messages are enabled and when jobboard is enabled. Logs might have included
TLS certificates and private_key. By default, in Octavia only WARNING and
above messages are enabled in taskflow and jobboard is disabled.
fixes:
- |
The parameters of a taskflow Flow were logged in ''INFO'' level messages by
taskflow, it included TLS-enabled listeners and pools parameters, such as
certificates and private_key.