Support b64 encoding of passphrase catalog

Some applications, such as k8s, require a base64 encoded string.
This patch updates the passphrase catalog such that the user can
specify to use base64 encoding on one or more passphrases found in
the passphrase catalog.

We add that support by:
1. Updating pegleg.engine.catalogs.passphrase_catalog to include a
   method which determines what encoding type to use, if any.
2. Updating pegleg.engine.generators.passphrase_generator.generate
   to encode the passphrase in base64 if detected. This change is
   designed to easily add other supported encoding methods in the
   future if desired.
3. Updating tests.unit.engine.test_generate_passphrases to
   demonstrate that the encoding field in passphrase catalog is
   being used, and that the resultant passphrase is in fact base64
   encoded. Also show that when encoding type is not specified, or
   is set to 'none' that base64 encoding does not take place.
4. Updating tests.unit.engine.test_generate_passphrases to
   demonstrate that the encoding field in passphrase catalog is
   being used, and that the resultant passphrase is in fact base64
   encoded.  We also demonstrate the flow from original passphrase
   to bytes, to base64 encoded, to encrypted, and back again yields
   the expected values at each step of encoding/decoding/encryption
   and decryption.

Change-Id: I47c740ca13be57ed74b6780f80c90b39e935708b
This commit is contained in:
Alexander Hughes 2019-05-10 09:30:53 -05:00
parent 498d5c078f
commit 5a58ba807a
4 changed files with 119 additions and 2 deletions

View File

@ -24,8 +24,10 @@ P_LENGTH = 'length'
P_DESCRIPTION = 'description'
P_ENCRYPTED = 'encrypted'
P_CLEARTEXT = 'cleartext'
P_ENCODING = 'encoding'
P_DEFAULT_LENGTH = 24
P_DEFAULT_STORAGE_POLICY = 'encrypted'
P_DEFAULT_ENCODING = 'none'
__all__ = ['PassphraseCatalog']
@ -86,3 +88,18 @@ class PassphraseCatalog(BaseCatalog):
return P_CLEARTEXT
else:
return P_DEFAULT_STORAGE_POLICY
def get_encoding_method(self, passphrase_name):
"""Return the encoding method of the ``passphrase_name``.
If the catalog does not specify an encoding method for the
``passphrase_name``, return the default encoding method, 'none'.
:param str passphrase_name: The name of the passphrase to evaluate.
:returns: The encoding method to be used for ``passphrase_name``.
:rtype: str
"""
for c_doc in self._catalog_docs:
for passphrase in c_doc['data']['passphrases']:
if passphrase[P_DOCUMENT_NAME] == passphrase_name:
return passphrase.get(P_ENCODING, P_DEFAULT_ENCODING)

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
from getpass import getpass
import logging
@ -69,6 +70,11 @@ class PassphraseGenerator(BaseGenerator):
if not passphrase:
passphrase = self._pass_util.get_crypto_string(
self._catalog.get_length(p_name))
encoding_method = self._catalog.get_encoding_method(p_name)
if encoding_method == 'base64':
# Convert string to bytes, then encode in base64
passphrase = passphrase.encode()
passphrase = base64.b64encode(passphrase)
docs = list()
storage_policy = self._catalog.get_storage_policy(p_name)
docs.append(self.generate_doc(

View File

@ -163,9 +163,11 @@ class PeglegSecretManagement(object):
# policies
doc_list.append(doc.embedded_document)
continue
secret_doc = doc.get_secret()
if type(secret_doc) != bytes:
secret_doc = secret_doc.encode()
doc.set_secret(
encrypt(doc.get_secret().encode(), self.passphrase, self.salt))
encrypt(secret_doc, self.passphrase, self.salt))
doc.set_encrypted(self._author)
encrypted_docs = True
doc_list.append(doc.pegleg_document)

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import os
import tempfile
import uuid
@ -23,6 +24,7 @@ from testfixtures import log_capture
import yaml
from pegleg.engine.generators.passphrase_generator import PassphraseGenerator
from pegleg.engine.util.cryptostring import CryptoString
from pegleg.engine.util import encryption
from pegleg.engine import util
import pegleg
@ -85,6 +87,32 @@ data:
...
""")
TEST_BASE64_PASSPHRASES_CATALOG = yaml.load("""
---
schema: pegleg/PassphraseCatalog/v1
metadata:
schema: metadata/Document/v1
name: cluster-passphrases
layeringDefinition:
abstract: false
layer: global
storagePolicy: cleartext
data:
passphrases:
- description: 'description of base64 required passphrases'
document_name: base64_encoded_passphrase_doc
encrypted: true
encoding: base64
- description: 'description of not base64 encoded passphrases'
document_name: not_encoded
encrypted: true
encoding: none
- description: 'description of not base64 encoded passphrases'
document_name: also_not_encoded
encrypted: true
...
""")
TEST_REPOSITORIES = {
'repositories': {
'global': {
@ -118,6 +146,7 @@ TEST_SITE_DEFINITION = {
TEST_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_PASSPHRASES_CATALOG]
TEST_GLOBAL_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_GLOBAL_PASSPHRASES_CATALOG]
TEST_BASE64_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_BASE64_PASSPHRASES_CATALOG]
@mock.patch.object(
@ -240,3 +269,66 @@ def test_global_passphrase_catalog(*_):
os.environ['PEGLEG_SALT'].encode())
if passphrase_file_name == "passphrase_from_global.yaml":
assert len(decrypted_passphrase) == 24
@mock.patch.object(
util.definition,
'documents_for_site',
autospec=True,
return_value=TEST_BASE64_SITE_DOCUMENTS)
@mock.patch.object(
pegleg.config,
'get_site_repo',
autospec=True,
return_value='cicd_site_repo')
@mock.patch.object(
util.definition,
'site_files',
autospec=True,
return_value=[
'cicd_global_repo/site/cicd/passphrases/passphrase-catalog.yaml', ])
@mock.patch.dict(os.environ, {
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC',
ENV_SALT: 'MySecretSalt1234567890]['})
def test_base64_passphrase_catalog(*_):
_dir = tempfile.mkdtemp()
os.makedirs(os.path.join(_dir, 'cicd_site_repo'), exist_ok=True)
PassphraseGenerator('cicd', _dir, 'test_author').generate()
for passphrase in TEST_BASE64_PASSPHRASES_CATALOG['data']['passphrases']:
passphrase_file_name = '{}.yaml'.format(passphrase['document_name'])
passphrase_file_path = os.path.join(_dir, 'site', 'cicd', 'secrets',
'passphrases',
passphrase_file_name)
assert os.path.isfile(passphrase_file_path)
with open(passphrase_file_path) as stream:
doc = yaml.safe_load(stream)
decrypted_passphrase = encryption.decrypt(
doc['data']['managedDocument']['data'],
os.environ['PEGLEG_PASSPHRASE'].encode(),
os.environ['PEGLEG_SALT'].encode())
if passphrase_file_name == "base64_encoded_passphrase_doc.yaml":
assert decrypted_passphrase == base64.b64encode(
base64.b64decode(decrypted_passphrase))
@mock.patch.dict(os.environ, {
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC',
ENV_SALT: 'MySecretSalt1234567890]['})
def test_crypt_coding_flow():
cs_util = CryptoString()
orig_passphrase = cs_util.get_crypto_string()
bytes_passphrase = orig_passphrase.encode()
b64_passphrase = base64.b64encode(bytes_passphrase)
encrypted = encryption.encrypt(b64_passphrase,
os.environ['PEGLEG_PASSPHRASE'].encode(),
os.environ['PEGLEG_SALT'].encode()
)
decrypted = encryption.decrypt(encrypted,
os.environ['PEGLEG_PASSPHRASE'].encode(),
os.environ['PEGLEG_SALT'].encode()
)
assert encrypted != decrypted
assert decrypted == b64_passphrase
assert base64.b64decode(decrypted) == bytes_passphrase
assert bytes_passphrase.decode() == orig_passphrase