Use concurrency to retrieve unencrypted secret data
This patch set uses concurrent.futures.ThreadPoolExecutor [0] to retrieve multiple Barbican secrets concurrently. This is because currently it is only possible to retrieve 1 secret payload from Barbican at a time -- for revisions with several dozen secrets it is therefore too costly to serially perform these API requests. A new configuration option is added to the [barbican] group called `max_workers` which specifies the number of threads to use. The default value is 10. Note that: "If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5" [0] so the default is 10 for 2 * 5 which is overly conservative if anything. If any error occurs during any of the requests a 500 is raised with appropriate details. [0] https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor Change-Id: I76a5bb6c345054e160c14bdf9fb7087e3a746a5e
This commit is contained in:
parent
1583b78902
commit
d27ab2d8ea
@ -24,6 +24,7 @@ barbican_group = cfg.OptGroup(
|
|||||||
help="Barbican options for allowing Deckhand to communicate with "
|
help="Barbican options for allowing Deckhand to communicate with "
|
||||||
"Barbican.")
|
"Barbican.")
|
||||||
|
|
||||||
|
|
||||||
barbican_opts = [
|
barbican_opts = [
|
||||||
# TODO(fmontei): Drop these options and related group once Keystone
|
# TODO(fmontei): Drop these options and related group once Keystone
|
||||||
# endpoint lookup is used instead.
|
# endpoint lookup is used instead.
|
||||||
@ -31,6 +32,10 @@ barbican_opts = [
|
|||||||
'api_endpoint',
|
'api_endpoint',
|
||||||
sample_default='http://barbican.example.org:9311/',
|
sample_default='http://barbican.example.org:9311/',
|
||||||
help='URL override for the Barbican API endpoint.'),
|
help='URL override for the Barbican API endpoint.'),
|
||||||
|
cfg.IntOpt(
|
||||||
|
'max_workers', default=10,
|
||||||
|
help='Maximum number of threads used to call secret storage service '
|
||||||
|
'concurrently.')
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@ -12,7 +12,10 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import concurrent.futures
|
||||||
|
|
||||||
import falcon
|
import falcon
|
||||||
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from oslo_utils import excutils
|
from oslo_utils import excutils
|
||||||
import six
|
import six
|
||||||
@ -31,6 +34,7 @@ from deckhand import errors
|
|||||||
from deckhand import policy
|
from deckhand import policy
|
||||||
from deckhand import types
|
from deckhand import types
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@ -113,7 +117,7 @@ class RenderedDocumentsResource(api_base.BaseResource):
|
|||||||
|
|
||||||
data = self._retrieve_documents_for_rendering(revision_id, **filters)
|
data = self._retrieve_documents_for_rendering(revision_id, **filters)
|
||||||
documents = document_wrapper.DocumentDict.from_list(data)
|
documents = document_wrapper.DocumentDict.from_list(data)
|
||||||
encryption_sources = self._retrieve_encrypted_documents(documents)
|
encryption_sources = self._resolve_encrypted_data(documents)
|
||||||
try:
|
try:
|
||||||
# NOTE(fmontei): `validate` is False because documents have already
|
# NOTE(fmontei): `validate` is False because documents have already
|
||||||
# been pre-validated during ingestion. Documents are post-validated
|
# been pre-validated during ingestion. Documents are post-validated
|
||||||
@ -194,23 +198,50 @@ class RenderedDocumentsResource(api_base.BaseResource):
|
|||||||
|
|
||||||
return documents
|
return documents
|
||||||
|
|
||||||
def _retrieve_encrypted_documents(self, documents):
|
def _resolve_encrypted_data(self, documents):
|
||||||
|
"""Resolve unencrypted data from the secret storage backend.
|
||||||
|
|
||||||
|
Submits concurrent requests to the secret storage backend for all
|
||||||
|
secret references for which unecrypted data is required for future
|
||||||
|
substitutions during the rendering process.
|
||||||
|
|
||||||
|
:param documents: List of all documents for the current revision.
|
||||||
|
:type documents: List[dict]
|
||||||
|
:returns: Dictionary keyed with secret references, whose values are
|
||||||
|
the corresponding unencrypted data.
|
||||||
|
:rtype: dict
|
||||||
|
|
||||||
|
"""
|
||||||
encryption_sources = {}
|
encryption_sources = {}
|
||||||
for document in documents:
|
secret_ref = lambda x: x.data
|
||||||
if document.is_encrypted and document.has_barbican_ref:
|
is_encrypted = lambda x: x.is_encrypted and x.has_barbican_ref
|
||||||
|
encrypted_documents = (d for d in documents if is_encrypted(d))
|
||||||
|
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(
|
||||||
|
max_workers=CONF.barbican.max_workers) as executor:
|
||||||
|
future_to_document = {
|
||||||
|
executor.submit(secrets_manager.SecretsManager.get,
|
||||||
|
secret_ref=secret_ref(d),
|
||||||
|
src_doc=d): d for d in encrypted_documents
|
||||||
|
}
|
||||||
|
for future in concurrent.futures.as_completed(future_to_document):
|
||||||
|
document = future_to_document[future]
|
||||||
try:
|
try:
|
||||||
unecrypted_data = secrets_manager.SecretsManager.get(
|
unecrypted_data = future.result()
|
||||||
secret_ref=document.data, src_doc=document)
|
except Exception as exc:
|
||||||
except Exception as e:
|
msg = ('Failed to retrieve a required secret from the '
|
||||||
LOG.error(
|
'configured secret storage service. Document: [%s,'
|
||||||
'An unknown exception occurred while trying to resolve'
|
' %s] %s. Secret ref: %s' % (
|
||||||
' a secret reference for substitution source document '
|
document.schema,
|
||||||
'[%s, %s] %s.', document.schema, document.layer,
|
document.layer,
|
||||||
document.name)
|
document.name,
|
||||||
raise errors.UnknownSubstitutionError(
|
secret_ref(document)))
|
||||||
src_schema=document.schema, src_layer=document.layer,
|
LOG.error(msg + '. Details: %s', exc)
|
||||||
src_name=document.name, details=str(e))
|
raise falcon.HTTPInternalServerError(description=msg)
|
||||||
encryption_sources.setdefault(document.data, unecrypted_data)
|
else:
|
||||||
|
encryption_sources.setdefault(secret_ref(document),
|
||||||
|
unecrypted_data)
|
||||||
|
|
||||||
return encryption_sources
|
return encryption_sources
|
||||||
|
|
||||||
def _post_validate(self, rendered_documents):
|
def _post_validate(self, rendered_documents):
|
||||||
|
@ -167,7 +167,7 @@ class SecretsSubstitution(object):
|
|||||||
contained in the destination document's data section to the
|
contained in the destination document's data section to the
|
||||||
actual unecrypted data. If encrypting data with Barbican, the
|
actual unecrypted data. If encrypting data with Barbican, the
|
||||||
reference will be a Barbican secret reference.
|
reference will be a Barbican secret reference.
|
||||||
:type encryption_sources: List[dict]
|
:type encryption_sources: dict
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# This maps a 2-tuple of (schema, name) to a document from which the
|
# This maps a 2-tuple of (schema, name) to a document from which the
|
||||||
|
3
tox.ini
3
tox.ini
@ -114,7 +114,8 @@ commands =
|
|||||||
# [H210] Require ‘autospec’, ‘spec’, or ‘spec_set’ in mock.patch/mock.patch.object calls
|
# [H210] Require ‘autospec’, ‘spec’, or ‘spec_set’ in mock.patch/mock.patch.object calls
|
||||||
# [H904] Delay string interpolations at logging calls.
|
# [H904] Delay string interpolations at logging calls.
|
||||||
enable-extensions = H106,H203,H204,H205,H210,H904
|
enable-extensions = H106,H203,H204,H205,H210,H904
|
||||||
ignore = H405
|
# [E731] Do not assign a lambda expression, use a def. This reduces readability in some cases.
|
||||||
|
ignore = E731,H405
|
||||||
exclude = .venv,.git,.tox,dist,*lib/python*,*egg,build,releasenotes,doc,alembic/versions
|
exclude = .venv,.git,.tox,dist,*lib/python*,*egg,build,releasenotes,doc,alembic/versions
|
||||||
|
|
||||||
[testenv:docs]
|
[testenv:docs]
|
||||||
|
Loading…
Reference in New Issue
Block a user