Patch Signature check failed after promotion.

Problem: modify_patch.sh was signature unaware.
When it was used to promote a patch to 'rel' status,
it would be re-assembled with a 'dev' signing key,
regardless of which key was used to sign the original
patch.

Solution: Make modify_patch.sh aware of signatures,
and force it to try and use the same signature as was
already in use.  This may require it to submit the patch
to the signing server.  If signing failes, the overall
patch modification will fail.

Change-Id: I5319062b910259f4eaff3df1504216f09536c84e
Signed-off-by: Scott Little <scott.little@windriver.com>
This commit is contained in:
Scott Little 2018-05-18 15:11:02 -04:00
parent 58c497826a
commit 58d8127094
4 changed files with 163 additions and 38 deletions

View File

@ -1571,7 +1571,8 @@ def modify_patch():
try:
temp_rpm_db_dir = "%s/%s" % (workdir, ".rpmdb")
if patch_path is not None:
PatchFile.modify_patch(patch_path, "status", new_status)
rc = PatchFile.modify_patch(patch_path, "status", new_status)
assert(rc == True)
print "Patch '%s' has been modified to status '%s'" % (patch_path, new_status)
else:
if sw_version is None or patch_id is None:
@ -1586,7 +1587,8 @@ def modify_patch():
print "patch_id = %s" % patch_id
print "patch_file_name = %s" % patch_file_name
print "patch_path = %s" % patch_path
PatchFile.modify_patch(patch_path, "status", new_status)
rc = PatchFile.modify_patch(patch_path, "status", new_status)
assert(rc == True)
os.chdir(pl._std_patch_git_path(".."))
issue_cmd("git add %s" % patch_path)
issue_cmd("git commit -m \"Modify status of patch '%s' to '%s'\"" % (patch_id, new_status))
@ -1699,9 +1701,6 @@ def modify_patch():
finally:
shutil.rmtree(workdir)
# PatchRecipeData
# prd.metadata['STATUS'] = new_status
def query_patch_usage():
msg = "query_patch [ --sw_version <version> --id <patch_id> | --file <patch_path.patch> ] [ --field <field_name> ]"
@ -1759,7 +1758,14 @@ def query_patch():
temp_rpm_db_dir = "%s/%s" % (workdir, ".rpmdb")
if patch_path is not None:
answer = PatchFile.query_patch(patch_path, field=field)
print str(answer)
field_order=['id', 'sw_version', 'status', 'cert', 'reboot_required', 'unremovable', 'summary', 'description', 'install_instructions', 'warnings']
for k in field_order:
if k in answer.keys():
print "%s: '%s'" % (k, answer[k])
# Print any remaining fields, any order
for k in answer.keys():
if k not in field_order:
print "%s: '%s'" % (k, answer[k])
else:
if sw_version is None or patch_id is None:
print "--sw_version and --id are required"

View File

@ -20,7 +20,7 @@ import tempfile
import xml.etree.ElementTree as ElementTree
from xml.dom import minidom
from cgcs_patch.patch_verify import verify_files
from cgcs_patch.patch_verify import verify_files, cert_type_all
from cgcs_patch.patch_signing import sign_files
from cgcs_patch.exceptions import MetadataFail, PatchFail, PatchValidationFailure, PatchMismatchFailure
@ -802,8 +802,8 @@ class PatchFile:
print "Patch is %s" % patchfile
@staticmethod
def write_patch(patchfile):
# SAL: Write the patch file. Assumes we are in a directory containing metadata.tar, and software.tar
def write_patch(patchfile, cert_type=None):
# Write the patch file. Assumes we are in a directory containing metadata.tar, and software.tar.
# Generate the metadata tarfile
tar = tarfile.open("metadata.tar", "w")
@ -820,8 +820,13 @@ class PatchFile:
sigfile.close()
# Generate the detached signature
sign_files(['metadata.tar', 'software.tar'],
detached_signature_file)
#
# Note: if cert_type requests a formal signature, but the signing key
# is not found, we'll instead sign with the 'dev' key and
# need_resign_with_formal is set to True.
need_resign_with_formal = sign_files(['metadata.tar', 'software.tar'],
detached_signature_file,
cert_type=cert_type)
# Create the patch
tar = tarfile.open(patchfile, "w:gz")
@ -831,12 +836,27 @@ class PatchFile:
tar.add(detached_signature_file)
tar.close()
if need_resign_with_formal:
try:
# Try to ensure "sign_patch_formal.sh" will be in our PATH
if 'MY_REPO' in os.environ:
os.environ['PATH'] += os.pathsep + os.environ['MY_REPO'] + "/build-tools"
if 'MY_PATCH_REPO' in os.environ:
os.environ['PATH'] += os.pathsep + os.environ['MY_PATCH_REPO'] + "/build-tools"
# Note: This can fail if the user is not authorized to sign with the formal key.
subprocess.check_call(["sign_patch_formal.sh", patchfile])
except subprocess.CalledProcessError as e:
print "Failed to sign official patch. Call to sign_patch_formal.sh process returned non-zero exit status %i" % e.returncode
raise SystemExit(e.returncode)
@staticmethod
def read_patch(path, metadata_only=False):
def read_patch(path, metadata_only=False, cert_type=None):
# We want to enable signature checking by default
# Note: cert_type=None is required if we are to enforce 'no dev patches on a formal load' rule.
verify_signature = True
# SAL: Open the patch file and extract the contents to the current dir
# Open the patch file and extract the contents to the current dir
tar = tarfile.open(path, "r:gz")
tar.extract("metadata.tar")
tar.extract("software.tar")
@ -880,17 +900,21 @@ class PatchFile:
filenames=["metadata.tar", "software.tar"]
sig_valid = verify_files(
filenames,
detached_signature_file)
detached_signature_file,
cert_type=cert_type)
if sig_valid is True:
msg = "Signature verified, patch has been signed"
LOG.info(msg)
if cert_type is None:
LOG.info(msg)
else:
msg = "Signature check failed"
LOG.error(msg)
if cert_type is None:
LOG.error(msg)
raise PatchValidationFailure(msg)
else:
msg = "Patch has not been signed"
LOG.error(msg)
if cert_type is None:
LOG.error(msg)
raise PatchValidationFailure(msg)
tar = tarfile.open("metadata.tar")
@ -917,17 +941,40 @@ class PatchFile:
r = {}
try:
PatchFile.read_patch(abs_patch, metadata_only=True)
if field is None or field == "cert":
# Need to determine the cert_type
for cert_type_str in cert_type_all:
try:
PatchFile.read_patch(abs_patch, metadata_only=True, cert_type=[cert_type_str])
except PatchValidationFailure as e:
pass;
else:
# Successfully opened the file for reading, and we have discovered the cert_type
r["cert"] = cert_type_str
break;
if "cert" not in r:
# If cert is unknown, then file is not yet open for reading.
# Try to open it for reading now, using all available keys.
# We can't omit cert_type, or pass None, because that will trigger the code
# path used by installed product, in which dev keys are not accepted unless
# a magic file exists.
PatchFile.read_patch(abs_patch, metadata_only=True, cert_type=cert_type_all)
thispatch = PatchData()
patch_id = thispatch.parse_metadata("metadata.xml")
r["id"] = patch_id
if field is None or field == "id":
r["id"] = patch_id
if field is None:
for f in ["status", "unremovable", "summary",
for f in ["status", "sw_version", "unremovable", "summary",
"description", "install_instructions",
"warnings", "reboot_required"]:
r[f] = thispatch.query_line(patch_id, f)
else:
r[field] = thispatch.query_line(patch_id, field)
if field not in [ 'id', 'cert' ]:
r[field] = thispatch.query_line(patch_id, field)
except PatchValidationFailure as e:
msg = "Patch validation failed during extraction"
@ -951,7 +998,7 @@ class PatchFile:
def modify_patch(patch,
key,
value):
rc = False
abs_patch = os.path.abspath(patch)
new_abs_patch = "%s.new" % abs_patch
@ -965,10 +1012,15 @@ class PatchFile:
os.chdir(tmpdir)
try:
PatchFile.read_patch(abs_patch, metadata_only=True)
cert_type = None
meta_data = PatchFile.query_patch(abs_patch)
if 'cert' in meta_data:
cert_type = meta_data['cert']
PatchFile.read_patch(abs_patch, metadata_only=True, cert_type=cert_type)
PatchData.modify_metadata_text("metadata.xml", key, value)
PatchFile.write_patch(new_abs_patch)
os.rename(new_abs_patch, abs_patch)
PatchFile.write_patch(new_abs_patch, cert_type=cert_type)
os.rename(new_abs_patch, abs_patch)
rc = True
except PatchValidationFailure as e:
raise e
@ -978,11 +1030,15 @@ class PatchFile:
msg = "Failed during patch extraction"
LOG.exception(msg)
raise PatchValidationFailure(msg)
except Exception as e:
template = "An exception of type {0} occurred. Arguments:\n{1!r}"
message = template.format(type(e).__name__, e.args)
print message
finally:
# Change back to original working dir
os.chdir(orig_wd)
shutil.rmtree(tmpdir)
return
return rc
@staticmethod
def extract_patch(patch,

View File

@ -11,7 +11,7 @@ from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Util.asn1 import DerSequence
from binascii import a2b_base64
from cgcs_patch.patch_verify import read_RSA_key
from cgcs_patch.patch_verify import read_RSA_key, cert_type_formal_str, cert_type_dev_str
# To save memory, read and hash 1M of files at a time
default_blocksize=1*1024*1024
@ -20,12 +20,12 @@ default_blocksize=1*1024*1024
#
# The (currently hardcoded) path on the signing server will be replaced
# by the capability to specify filename from calling function.
private_key_files=['/signing/keys/formal-private-key.pem',
os.path.expandvars('$MY_REPO/build-tools/signing/dev-private-key.pem')
]
private_key_files={cert_type_formal_str: '/signing/keys/formal-private-key.pem',
cert_type_dev_str: os.path.expandvars('$MY_REPO/build-tools/signing/dev-private-key.pem')
}
def sign_files(filenames, signature_file, private_key=None):
def sign_files(filenames, signature_file, private_key=None, cert_type=None):
"""
Utility function for signing data in files.
:param filenames: A list of files containing the data to be signed
@ -34,6 +34,8 @@ def sign_files(filenames, signature_file, private_key=None):
:param private_key: If specified, sign with this private key. Otherwise,
the files in private_key_files will be searched for
and used, if found.
:param cert_type: If specified, and private_key is not specified, sign
with a key of the specified type. e.g. 'dev' or 'formal'
"""
# Hash the data across all files
@ -47,14 +49,33 @@ def sign_files(filenames, signature_file, private_key=None):
data=infile.read(blocksize)
# Find a private key to use, if not already provided
need_resign_with_formal = False
if private_key is None:
for filename in private_key_files:
# print 'Checking to see if ' + filename + ' exists\n'
if cert_type is not None:
# A Specific key is asked for
assert (cert_type in private_key_files.keys()),"cert_type=%s is not a known cert type" % cert_type
dict_key = cert_type
filename = private_key_files[dict_key]
# print 'cert_type given: Checking to see if ' + filename + ' exists\n'
if not os.path.exists(filename) and dict_key == cert_type_formal_str:
# The formal key is asked for, but is not locally available,
# substitute the dev key, and we will try to resign with the formal later.
dict_key = cert_type_dev_str
filename = private_key_files[dict_key]
need_resign_with_formal = True
if os.path.exists(filename):
# print 'Getting private key from ' + filename + '\n'
private_key = read_RSA_key(open(filename, 'rb').read())
else:
# Search for available keys
for dict_key in private_key_files.keys():
filename = private_key_files[dict_key]
# print 'Search for available keys: Checking to see if ' + filename + ' exists\n'
if os.path.exists(filename):
# print 'Getting private key from ' + filename + '\n'
private_key = read_RSA_key(open(filename, 'rb').read())
assert (private_key is not None),"Could not find private signing key"
assert (private_key is not None),"Could not find signing key"
# Encrypt the hash (sign the data) with the key we find
signer = PKCS1_PSS.new(private_key)
@ -64,3 +85,4 @@ def sign_files(filenames, signature_file, private_key=None):
with open(signature_file, 'wb') as outfile:
outfile.write(signature)
return need_resign_with_formal

View File

@ -23,6 +23,11 @@ default_blocksize=1*1024*1024
dev_certificate_marker='/etc/pki/wrs/dev_certificate_enable.bin'
LOG = logging.getLogger('main_logger')
cert_type_dev_str='dev'
cert_type_formal_str='formal'
cert_type_dev=[cert_type_dev_str]
cert_type_formal=[cert_type_formal_str]
cert_type_all=[cert_type_dev_str, cert_type_formal_str]
def verify_hash(data_hash, signature_bytes, certificate_list):
"""
@ -49,13 +54,45 @@ def verify_hash(data_hash, signature_bytes, certificate_list):
# since we want to generate detached sigs that a customer can validate
# OpenSSL
verifier = PKCS1_PSS.new(pub_key)
verified = verifier.verify(data_hash, signature_bytes)
try:
verified = verifier.verify(data_hash, signature_bytes)
except ValueError as e:
verified = False
pass
if not verified:
verifier = PKCS1_v1_5.new(pub_key)
verified = verifier.verify(data_hash, signature_bytes)
try:
verified = verifier.verify(data_hash, signature_bytes)
except ValueError as e:
verified = False
pass
return verified
def get_public_certificates_by_type(cert_type=cert_type_all):
"""
Builds a list of accepted certificates which can be used to validate
further things. This list may contain multiple certificates depending on
the configuration of the system and the value of cert_type.
:param cert_type: A list of strings, certificate types to include in list
'formal' - include formal certificate if available
'dev' - include developer certificate if available
:return: A list of certificates in PEM format
"""
cert_list = []
if cert_type_formal_str in cert_type:
cert_list.append(formal_certificate)
if cert_type_dev_str in cert_type:
cert_list.append(dev_certificate)
return cert_list
def get_public_certificates():
"""
Builds a list of accepted certificates which can be used to validate
@ -117,13 +154,14 @@ def read_RSA_key(key_data):
return key
def verify_files(filenames, signature_file):
def verify_files(filenames, signature_file, cert_type=None):
"""
Verify data files against a detached signature.
:param filenames: A list of files containing the data which was signed
:param public_key_file: A file containing the public key or certificate
corresponding to the key which signed the data
:param signature_file: The name of the file containing the signature
:param cert_type: Only use specified certififcate type to verify (dev/formal)
:return: True if the signature was verified, False otherwise
"""
@ -142,6 +180,9 @@ def verify_files(filenames, signature_file):
signature_bytes = sig_file.read()
# Verify the signature
certificate_list = get_public_certificates()
if cert_type is None:
certificate_list = get_public_certificates()
else:
certificate_list = get_public_certificates_by_type(cert_type=cert_type)
return verify_hash(data_hash, signature_bytes, certificate_list)