Merge "Support b64 encoding of passphrase catalog"

This commit is contained in:
Zuul 2019-05-21 20:03:37 +00:00 committed by Gerrit Code Review
commit 416adce7ab
4 changed files with 119 additions and 2 deletions

View File

@ -24,8 +24,10 @@ P_LENGTH = 'length'
P_DESCRIPTION = 'description'
P_ENCRYPTED = 'encrypted'
P_CLEARTEXT = 'cleartext'
P_ENCODING = 'encoding'
P_DEFAULT_LENGTH = 24
P_DEFAULT_STORAGE_POLICY = 'encrypted'
P_DEFAULT_ENCODING = 'none'
__all__ = ['PassphraseCatalog']
@ -86,3 +88,18 @@ class PassphraseCatalog(BaseCatalog):
return P_CLEARTEXT
else:
return P_DEFAULT_STORAGE_POLICY
def get_encoding_method(self, passphrase_name):
"""Return the encoding method of the ``passphrase_name``.
If the catalog does not specify an encoding method for the
``passphrase_name``, return the default encoding method, 'none'.
:param str passphrase_name: The name of the passphrase to evaluate.
:returns: The encoding method to be used for ``passphrase_name``.
:rtype: str
"""
for c_doc in self._catalog_docs:
for passphrase in c_doc['data']['passphrases']:
if passphrase[P_DOCUMENT_NAME] == passphrase_name:
return passphrase.get(P_ENCODING, P_DEFAULT_ENCODING)

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
from getpass import getpass
import logging
@ -69,6 +70,11 @@ class PassphraseGenerator(BaseGenerator):
if not passphrase:
passphrase = self._pass_util.get_crypto_string(
self._catalog.get_length(p_name))
encoding_method = self._catalog.get_encoding_method(p_name)
if encoding_method == 'base64':
# Convert string to bytes, then encode in base64
passphrase = passphrase.encode()
passphrase = base64.b64encode(passphrase)
docs = list()
storage_policy = self._catalog.get_storage_policy(p_name)
docs.append(self.generate_doc(

View File

@ -163,9 +163,11 @@ class PeglegSecretManagement(object):
# policies
doc_list.append(doc.embedded_document)
continue
secret_doc = doc.get_secret()
if type(secret_doc) != bytes:
secret_doc = secret_doc.encode()
doc.set_secret(
encrypt(doc.get_secret().encode(), self.passphrase, self.salt))
encrypt(secret_doc, self.passphrase, self.salt))
doc.set_encrypted(self._author)
encrypted_docs = True
doc_list.append(doc.pegleg_document)

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import os
import tempfile
import uuid
@ -23,6 +24,7 @@ from testfixtures import log_capture
import yaml
from pegleg.engine.generators.passphrase_generator import PassphraseGenerator
from pegleg.engine.util.cryptostring import CryptoString
from pegleg.engine.util import encryption
from pegleg.engine import util
import pegleg
@ -85,6 +87,32 @@ data:
...
""")
TEST_BASE64_PASSPHRASES_CATALOG = yaml.load("""
---
schema: pegleg/PassphraseCatalog/v1
metadata:
schema: metadata/Document/v1
name: cluster-passphrases
layeringDefinition:
abstract: false
layer: global
storagePolicy: cleartext
data:
passphrases:
- description: 'description of base64 required passphrases'
document_name: base64_encoded_passphrase_doc
encrypted: true
encoding: base64
- description: 'description of not base64 encoded passphrases'
document_name: not_encoded
encrypted: true
encoding: none
- description: 'description of not base64 encoded passphrases'
document_name: also_not_encoded
encrypted: true
...
""")
TEST_REPOSITORIES = {
'repositories': {
'global': {
@ -118,6 +146,7 @@ TEST_SITE_DEFINITION = {
TEST_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_PASSPHRASES_CATALOG]
TEST_GLOBAL_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_GLOBAL_PASSPHRASES_CATALOG]
TEST_BASE64_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_BASE64_PASSPHRASES_CATALOG]
@mock.patch.object(
@ -240,3 +269,66 @@ def test_global_passphrase_catalog(*_):
os.environ['PEGLEG_SALT'].encode())
if passphrase_file_name == "passphrase_from_global.yaml":
assert len(decrypted_passphrase) == 24
@mock.patch.object(
util.definition,
'documents_for_site',
autospec=True,
return_value=TEST_BASE64_SITE_DOCUMENTS)
@mock.patch.object(
pegleg.config,
'get_site_repo',
autospec=True,
return_value='cicd_site_repo')
@mock.patch.object(
util.definition,
'site_files',
autospec=True,
return_value=[
'cicd_global_repo/site/cicd/passphrases/passphrase-catalog.yaml', ])
@mock.patch.dict(os.environ, {
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC',
ENV_SALT: 'MySecretSalt1234567890]['})
def test_base64_passphrase_catalog(*_):
_dir = tempfile.mkdtemp()
os.makedirs(os.path.join(_dir, 'cicd_site_repo'), exist_ok=True)
PassphraseGenerator('cicd', _dir, 'test_author').generate()
for passphrase in TEST_BASE64_PASSPHRASES_CATALOG['data']['passphrases']:
passphrase_file_name = '{}.yaml'.format(passphrase['document_name'])
passphrase_file_path = os.path.join(_dir, 'site', 'cicd', 'secrets',
'passphrases',
passphrase_file_name)
assert os.path.isfile(passphrase_file_path)
with open(passphrase_file_path) as stream:
doc = yaml.safe_load(stream)
decrypted_passphrase = encryption.decrypt(
doc['data']['managedDocument']['data'],
os.environ['PEGLEG_PASSPHRASE'].encode(),
os.environ['PEGLEG_SALT'].encode())
if passphrase_file_name == "base64_encoded_passphrase_doc.yaml":
assert decrypted_passphrase == base64.b64encode(
base64.b64decode(decrypted_passphrase))
@mock.patch.dict(os.environ, {
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC',
ENV_SALT: 'MySecretSalt1234567890]['})
def test_crypt_coding_flow():
cs_util = CryptoString()
orig_passphrase = cs_util.get_crypto_string()
bytes_passphrase = orig_passphrase.encode()
b64_passphrase = base64.b64encode(bytes_passphrase)
encrypted = encryption.encrypt(b64_passphrase,
os.environ['PEGLEG_PASSPHRASE'].encode(),
os.environ['PEGLEG_SALT'].encode()
)
decrypted = encryption.decrypt(encrypted,
os.environ['PEGLEG_PASSPHRASE'].encode(),
os.environ['PEGLEG_SALT'].encode()
)
assert encrypted != decrypted
assert decrypted == b64_passphrase
assert base64.b64decode(decrypted) == bytes_passphrase
assert bytes_passphrase.decode() == orig_passphrase