Convert sw-patch to use python3

Updated the library that we use for
xml manipulation to lxml.
Implemented __le__ to compare
two PackageVersion objects.
Switched to Cryptodome instead of Crypto
and updated requirements.txt
Replaced the old dict.keys() with list(dict)
Use strict=False in configparser when py3
is used

Tested and working:
query, upload, apply
local and host-install,
delete, remove, query patch details

Story: 2008454
Task: 42768

Signed-off-by: Daniel Safta <daniel.safta@windriver.com>
Change-Id: I3c68322603eaaf9a78d101b5b1198e9582497105
(cherry picked from commit 51524c09c7)
This commit is contained in:
Daniel Safta 2021-06-03 12:27:56 +03:00 committed by Charles Short
parent 37e68c0d54
commit fc3feac7d7
9 changed files with 115 additions and 83 deletions

View File

@ -13,6 +13,7 @@ BuildRequires: python2-pip
BuildRequires: python2-wheel
BuildRequires: systemd-units
BuildRequires: systemd-devel
BuildRequires: python-lxml
Requires: python-devel
Requires: python-crypto
Requires: dnf

View File

@ -6,6 +6,7 @@ SPDX-License-Identifier: Apache-2.0
"""
import os
import six
from six.moves import configparser
import io
import logging
@ -48,7 +49,18 @@ def read_config():
global controller_port
global agent_port
config = configparser.SafeConfigParser(defaults)
# In python3 configparser uses strict mode by default. It doesn't
# agree duplicate keys, and will throw an error
# In python2 the strict argument is missing
# TODO(dsafta): the logic branching here can be removed once
# https://bugs.launchpad.net/starlingx/+bug/1931529 is fixed, allowing
# python3 parser to work in strict mode.
if six.PY2:
config = configparser.SafeConfigParser(defaults)
elif six.PY3:
config = configparser.SafeConfigParser(defaults, strict=False)
config.read(patching_conf)
patching_conf_mtime = os.stat(patching_conf).st_mtime
@ -103,7 +115,10 @@ def get_mgmt_iface():
# so return the cached value.
return mgmt_if
config = configparser.SafeConfigParser()
if six.PY2:
config = configparser.SafeConfigParser()
elif six.PY3:
config = configparser.SafeConfigParser(strict=False)
# The platform.conf file has no section headers, which causes problems
# for ConfigParser. So we'll fake it out.

View File

@ -170,7 +170,7 @@ class PatchMessageHelloAgentAck(messages.PatchMessage):
global pa
self.encode()
message = json.dumps(self.message)
sock.sendto(message, (pa.controller_address, cfg.controller_port))
sock.sendto(str.encode(message), (pa.controller_address, cfg.controller_port))
class PatchMessageQueryDetailed(messages.PatchMessage):
@ -216,7 +216,7 @@ class PatchMessageQueryDetailedResp(messages.PatchMessage):
def send(self, sock):
self.encode()
message = json.dumps(self.message)
sock.sendall(message)
sock.sendall(str.encode(message))
class PatchMessageAgentInstallReq(messages.PatchMessage):
@ -277,7 +277,7 @@ class PatchMessageAgentInstallResp(messages.PatchMessage):
address = (addr[0], cfg.controller_port)
self.encode()
message = json.dumps(self.message)
sock.sendto(message, address)
sock.sendto(str.encode(message), address)
# Send a hello ack to follow it
resp = PatchMessageHelloAgentAck()
@ -498,7 +498,7 @@ class PatchAgent(PatchService):
if pkg.name not in self.duplicated_pkgs:
self.duplicated_pkgs[pkg.name] = {}
if pkg.arch not in self.duplicated_pkgs[pkg.name]:
self.duplicated_pkgs[pkg.name][pkg.arch] = map(PatchAgent.pkgobj_to_version_str, pkglist)
self.duplicated_pkgs[pkg.name][pkg.arch] = list(map(PatchAgent.pkgobj_to_version_str, pkglist))
LOG.warn("Duplicate packages installed: %s %s",
pkg.name, ", ".join(self.duplicated_pkgs[pkg.name][pkg.arch]))
@ -847,7 +847,7 @@ class PatchAgent(PatchService):
break
if packet:
data += packet
data += packet.decode()
if data == '':
break

View File

@ -223,7 +223,7 @@ def print_patch_op_result(req):
show_repo = False
for patch_id in pd.keys():
for patch_id in list(pd):
if len(patch_id) > width_id:
width_id = len(patch_id)
if len(pd[patch_id]["sw_version"]) > width_rel:
@ -244,7 +244,7 @@ def print_patch_op_result(req):
print("{0} {1} {2} {3} {4}".format(
'=' * width_id, '=' * width_rr, '=' * width_rel, '=' * width_repo, '=' * width_state))
for patch_id in sorted(pd.keys()):
for patch_id in sorted(list(pd)):
if "reboot_required" in pd[patch_id]:
rr = pd[patch_id]["reboot_required"]
else:
@ -266,7 +266,7 @@ def print_patch_op_result(req):
print("{0} {1} {2} {3}".format(
'=' * width_id, '=' * width_rr, '=' * width_rel, '=' * width_state))
for patch_id in sorted(pd.keys()):
for patch_id in sorted(list(pd)):
if "reboot_required" in pd[patch_id]:
rr = pd[patch_id]["reboot_required"]
else:
@ -302,7 +302,7 @@ def print_patch_show_result(req):
if 'metadata' in data:
pd = data['metadata']
for patch_id in sorted(pd.keys()):
for patch_id in sorted(list(pd)):
print("%s:" % patch_id)
if "sw_version" in pd[patch_id] and pd[patch_id]["sw_version"] != "":
@ -606,7 +606,7 @@ def patch_commit_req(debug, args):
data = json.loads(req.text)
if 'pd' in data:
patch_list = sorted(data['pd'].keys())
patch_list = sorted(list(data['pd']))
elif req.status_code == 500:
print("Failed to get patch list. Aborting...")
return 1
@ -907,6 +907,7 @@ def wait_for_install_complete(agent_ip):
state = None
agents = data['data']
interim_state = None
for agent in agents:
if agent['hostname'] == agent_ip \
or agent['ip'] == agent_ip:
@ -1299,7 +1300,7 @@ def completion_opts(args):
data = json.loads(req.text)
if 'pd' in data:
print(" ".join(data['pd'].keys()))
print(" ".join(list(data['pd'])))
return 0
elif args[0] == "hosts":

View File

@ -12,6 +12,7 @@ import socket
import json
import select
import subprocess
import six
from six.moves import configparser
import rpm
import os
@ -241,7 +242,7 @@ class PatchMessageHello(messages.PatchMessage):
global pc
self.encode()
message = json.dumps(self.message)
sock.sendto(message, (pc.controller_address, cfg.controller_port))
sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port))
class PatchMessageHelloAck(messages.PatchMessage):
@ -266,7 +267,7 @@ class PatchMessageHelloAck(messages.PatchMessage):
global pc
self.encode()
message = json.dumps(self.message)
sock.sendto(message, (pc.controller_address, cfg.controller_port))
sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port))
class PatchMessageSyncReq(messages.PatchMessage):
@ -297,7 +298,7 @@ class PatchMessageSyncReq(messages.PatchMessage):
LOG.info("sending sync req")
self.encode()
message = json.dumps(self.message)
sock.sendto(message, (pc.controller_address, cfg.controller_port))
sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port))
class PatchMessageSyncComplete(messages.PatchMessage):
@ -324,7 +325,7 @@ class PatchMessageSyncComplete(messages.PatchMessage):
LOG.info("sending sync complete")
self.encode()
message = json.dumps(self.message)
sock.sendto(message, (pc.controller_address, cfg.controller_port))
sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port))
class PatchMessageHelloAgent(messages.PatchMessage):
@ -344,8 +345,8 @@ class PatchMessageHelloAgent(messages.PatchMessage):
self.encode()
message = json.dumps(self.message)
local_hostname = utils.ip_to_versioned_localhost(cfg.agent_mcast_group)
sock.sendto(message, (pc.agent_address, cfg.agent_port))
sock.sendto(message, (local_hostname, cfg.agent_port))
sock.sendto(str.encode(message), (pc.agent_address, cfg.agent_port))
sock.sendto(str.encode(message), (local_hostname, cfg.agent_port))
class PatchMessageHelloAgentAck(messages.PatchMessage):
@ -414,7 +415,7 @@ class PatchMessageQueryDetailed(messages.PatchMessage):
def send(self, sock):
self.encode()
message = json.dumps(self.message)
sock.sendall(message)
sock.sendall(str.encode(message))
class PatchMessageQueryDetailedResp(messages.PatchMessage):
@ -467,7 +468,7 @@ class PatchMessageQueryDetailedResp(messages.PatchMessage):
self.agent_sw_version,
self.subfunctions,
self.agent_state)
for patch_id in pc.interim_state.keys():
for patch_id in list(pc.interim_state):
if ip in pc.interim_state[patch_id]:
pc.interim_state[patch_id].remove(ip)
if len(pc.interim_state[patch_id]) == 0:
@ -499,7 +500,7 @@ class PatchMessageAgentInstallReq(messages.PatchMessage):
LOG.info("sending install request to node: %s", self.ip)
self.encode()
message = json.dumps(self.message)
sock.sendto(message, (self.ip, cfg.agent_port))
sock.sendto(str.encode(message), (self.ip, cfg.agent_port))
class PatchMessageAgentInstallResp(messages.PatchMessage):
@ -569,7 +570,7 @@ class PatchMessageDropHostReq(messages.PatchMessage):
global pc
self.encode()
message = json.dumps(self.message)
sock.sendto(message, (pc.controller_address, cfg.controller_port))
sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port))
class PatchController(PatchService):
@ -648,7 +649,10 @@ class PatchController(PatchService):
pass
def write_state_file(self):
config = configparser.ConfigParser()
if six.PY2:
config = configparser.ConfigParser()
elif six.PY3:
config = configparser.ConfigParser(strict=False)
cfgfile = open(state_file, 'w')
@ -658,7 +662,10 @@ class PatchController(PatchService):
cfgfile.close()
def read_state_file(self):
config = configparser.ConfigParser()
if six.PY2:
config = configparser.ConfigParser()
elif six.PY3:
config = configparser.ConfigParser(strict=False)
config.read(state_file)
@ -755,12 +762,12 @@ class PatchController(PatchService):
self.patch_data.metadata[patch_id]["patchstate"] = \
self.patch_data.metadata[patch_id]["repostate"]
for ip in self.hosts.keys():
for ip in list(self.hosts):
if not self.hosts[ip].out_of_date:
continue
for pkg in self.hosts[ip].installed.keys():
for patch_id in self.patch_data.content_versions.keys():
for pkg in list(self.hosts[ip].installed):
for patch_id in list(self.patch_data.content_versions):
if pkg not in self.patch_data.content_versions[patch_id]:
continue
@ -814,7 +821,7 @@ class PatchController(PatchService):
# Check the to_remove list
for pkg in self.hosts[ip].to_remove:
for patch_id in self.patch_data.content_versions.keys():
for patch_id in list(self.patch_data.content_versions):
if pkg not in self.patch_data.content_versions[patch_id]:
continue
@ -838,7 +845,7 @@ class PatchController(PatchService):
# Check the missing_pkgs list
for pkg in self.hosts[ip].missing_pkgs:
for patch_id in self.patch_data.content_versions.keys():
for patch_id in list(self.patch_data.content_versions):
if pkg not in self.patch_data.content_versions[patch_id]:
continue
@ -900,7 +907,7 @@ class PatchController(PatchService):
# Pass the current patch state to the semantic check as a series of args
patch_state_args = []
for patch_id in self.patch_data.metadata.keys():
for patch_id in list(self.patch_data.metadata):
patch_state = '%s=%s' % (patch_id, self.patch_data.metadata[patch_id]["patchstate"])
patch_state_args += ['-p', patch_state]
@ -1060,7 +1067,7 @@ class PatchController(PatchService):
# Set patch_ids to list of all available patches
# We're getting this list now, before we load the applied patches
patch_list = []
for patch_id in sorted(self.patch_data.metadata.keys()):
for patch_id in sorted(list(self.patch_data.metadata)):
if self.patch_data.metadata[patch_id]["repostate"] == constants.AVAILABLE:
patch_list.append(patch_id)
@ -1198,7 +1205,7 @@ class PatchController(PatchService):
self.patch_data.metadata[patch_id]["patchstate"] = constants.UNKNOWN
self.hosts_lock.acquire()
self.interim_state[patch_id] = self.hosts.keys()
self.interim_state[patch_id] = list(self.hosts)
self.hosts_lock.release()
repo_changed = True
@ -1283,7 +1290,7 @@ class PatchController(PatchService):
# Next, see if any of the patches are required by applied patches
# required_patches will map the required patch to the patches that need it
required_patches = {}
for patch_iter in self.patch_data.metadata.keys():
for patch_iter in list(self.patch_data.metadata):
# Ignore patches in the op set
if patch_iter in patch_list:
continue
@ -1381,7 +1388,7 @@ class PatchController(PatchService):
self.patch_data.metadata[patch_id]["patchstate"] = constants.UNKNOWN
self.hosts_lock.acquire()
self.interim_state[patch_id] = self.hosts.keys()
self.interim_state[patch_id] = list(self.hosts)
self.hosts_lock.release()
if repo_changed:
@ -1570,7 +1577,7 @@ class PatchController(PatchService):
return dict(info=msg_info, warning=msg_warning, error=msg_error)
# Delete patch XML files
for patch_id in self.patch_data.metadata.keys():
for patch_id in list(self.patch_data.metadata):
if self.patch_data.metadata[patch_id]["sw_version"] != release:
continue
@ -1682,7 +1689,7 @@ class PatchController(PatchService):
return dict(info=msg_info, warning=msg_warning, error=msg_error)
required_patches = {}
for patch_iter in self.patch_data.metadata.keys():
for patch_iter in list(self.patch_data.metadata):
for req_patch in self.patch_data.metadata[patch_iter]["requires"]:
if req_patch not in patch_ids:
continue
@ -1795,7 +1802,7 @@ class PatchController(PatchService):
self.patch_data_lock.acquire()
for patch_id in patch_ids:
if patch_id not in self.patch_data.metadata.keys():
if patch_id not in list(self.patch_data.metadata):
results["error"] += "%s is unrecognized\n" % patch_id
for patch_id, data in self.patch_data.metadata.items():
@ -1850,7 +1857,7 @@ class PatchController(PatchService):
# Verify patch IDs
for patch_id in sorted(patch_ids):
if patch_id not in self.patch_data.metadata.keys():
if patch_id not in list(self.patch_data.metadata):
errormsg = "%s is unrecognized\n" % patch_id
LOG.info("patch_query_dependencies: %s", errormsg)
results["error"] += errormsg
@ -1907,7 +1914,7 @@ class PatchController(PatchService):
# Verify patch IDs
self.patch_data_lock.acquire()
for patch_id in sorted(patch_ids):
if patch_id not in self.patch_data.metadata.keys():
if patch_id not in list(self.patch_data.metadata):
errormsg = "%s is unrecognized\n" % patch_id
LOG.info("patch_commit: %s", errormsg)
results["error"] += errormsg
@ -2062,10 +2069,10 @@ class PatchController(PatchService):
output = []
self.hosts_lock.acquire()
for nbr in self.hosts.keys():
for nbr in list(self.hosts):
host = self.hosts[nbr].get_dict()
host["interim_state"] = False
for patch_id in pc.interim_state.keys():
for patch_id in list(pc.interim_state):
if nbr in pc.interim_state[patch_id]:
host["interim_state"] = True
@ -2198,7 +2205,7 @@ class PatchController(PatchService):
# Because the host may be getting dropped due to deletion,
# we may be unable to do a hostname lookup. Instead, we'll
# iterate through the table here.
for host in self.hosts.keys():
for host in list(self.hosts):
if host_ip == self.hosts[host].hostname:
ip = host
break
@ -2219,7 +2226,7 @@ class PatchController(PatchService):
audit_log_info(msg)
del self.hosts[ip]
for patch_id in self.interim_state.keys():
for patch_id in list(self.interim_state):
if ip in self.interim_state[patch_id]:
self.interim_state[patch_id].remove(ip)
@ -2555,7 +2562,7 @@ class PatchControllerMainThread(threading.Thread):
break
if packet:
data += packet
data += packet.decode()
if data == '':
break
@ -2645,7 +2652,7 @@ class PatchControllerMainThread(threading.Thread):
# Age out neighbours
pc.controller_neighbours_lock.acquire()
nbrs = pc.controller_neighbours.keys()
nbrs = list(pc.controller_neighbours)
for n in nbrs:
# Age out controllers after 2 minutes
if pc.controller_neighbours[n].get_age() >= 120:
@ -2654,13 +2661,13 @@ class PatchControllerMainThread(threading.Thread):
pc.controller_neighbours_lock.release()
pc.hosts_lock.acquire()
nbrs = pc.hosts.keys()
nbrs = list(pc.hosts)
for n in nbrs:
# Age out hosts after 1 hour
if pc.hosts[n].get_age() >= 3600:
LOG.info("Aging out host %s from table", n)
del pc.hosts[n]
for patch_id in pc.interim_state.keys():
for patch_id in list(pc.interim_state):
if n in pc.interim_state[patch_id]:
pc.interim_state[patch_id].remove(n)

View File

@ -17,7 +17,7 @@ import subprocess
import sys
import tarfile
import tempfile
import xml.etree.ElementTree as ElementTree
from lxml import etree as ElementTree
from xml.dom import minidom
from cgcs_patch.patch_verify import verify_files
@ -146,7 +146,7 @@ def write_xml_file(top,
fname):
# Generate the file, in a readable format if possible
outfile = open(fname, 'w')
rough_xml = ElementTree.tostring(top, 'utf-8')
rough_xml = ElementTree.tostring(top)
if platform.python_version() == "2.7.2":
# The 2.7.2 toprettyxml() function unnecessarily indents
# childless tags, adding whitespace. In the case of the
@ -227,6 +227,13 @@ class PackageVersion(object):
self.version = version
self.release = release
def __le__(self, other):
out = rpm.labelCompare((self.epoch, self.version, self.release),
(other.epoch, other.version, other.release))
if out == 1:
return False
return True
def __cmp__(self, other):
"""
This function is called by comparison operators to compare
@ -378,42 +385,42 @@ class PatchData(object):
self.semantics.update(new_patch.semantics)
# Need to recursively update package_version and keys dicts
for patch_sw_version in new_patch.package_versions.keys():
for patch_sw_version in list(new_patch.package_versions):
if patch_sw_version not in self.package_versions:
self.package_versions[patch_sw_version] = {}
for pkgname in new_patch.package_versions[patch_sw_version].keys():
for pkgname in list(new_patch.package_versions[patch_sw_version]):
if pkgname not in self.package_versions[patch_sw_version]:
self.package_versions[patch_sw_version][pkgname] = {}
for arch in new_patch.package_versions[patch_sw_version][pkgname].keys():
for arch in list(new_patch.package_versions[patch_sw_version][pkgname]):
if arch not in self.package_versions[patch_sw_version][pkgname]:
self.package_versions[patch_sw_version][pkgname][arch] = {}
for pkgver in new_patch.package_versions[patch_sw_version][pkgname][arch].keys():
for pkgver in list(new_patch.package_versions[patch_sw_version][pkgname][arch]):
self.package_versions[patch_sw_version][pkgname][arch][pkgver] = patch_id
for patch_sw_version in new_patch.groups.keys():
for patch_sw_version in list(new_patch.groups):
if patch_sw_version not in self.groups:
self.groups[patch_sw_version] = {}
for ptype in new_patch.groups[patch_sw_version].keys():
for ptype in list(new_patch.groups[patch_sw_version]):
if ptype not in self.groups[patch_sw_version]:
self.groups[patch_sw_version][ptype] = {}
for patch_id in new_patch.groups[patch_sw_version][ptype].keys():
for patch_id in list(new_patch.groups[patch_sw_version][ptype]):
if patch_id not in self.groups[patch_sw_version][ptype]:
self.groups[patch_sw_version][ptype][patch_id] = {}
self.groups[patch_sw_version][ptype][patch_id].update(
new_patch.groups[patch_sw_version][ptype][patch_id])
def update_patch(self, updated_patch):
for patch_id in updated_patch.metadata.keys():
for patch_id in list(updated_patch.metadata):
# Update all fields except repostate
cur_repostate = self.metadata[patch_id]['repostate']
self.metadata[patch_id].update(updated_patch.metadata[patch_id])
self.metadata[patch_id]['repostate'] = cur_repostate
def delete_patch(self, patch_id):
for patch_sw_version in self.package_versions.keys():
for pkgname in self.package_versions[patch_sw_version].keys():
for arch in self.package_versions[patch_sw_version][pkgname].keys():
for pkgver in self.package_versions[patch_sw_version][pkgname][arch].keys():
for patch_sw_version in list(self.package_versions):
for pkgname in list(self.package_versions[patch_sw_version]):
for arch in list(self.package_versions[patch_sw_version][pkgname]):
for pkgver in list(self.package_versions[patch_sw_version][pkgname][arch]):
if self.package_versions[patch_sw_version][pkgname][arch][pkgver] == patch_id:
del self.package_versions[patch_sw_version][pkgname][arch][pkgver]
if len(self.package_versions[patch_sw_version][pkgname][arch]) == 0:
@ -423,8 +430,8 @@ class PatchData(object):
if len(self.package_versions[patch_sw_version]) == 0:
del self.package_versions[patch_sw_version]
for patch_sw_version in self.groups.keys():
for ptype in self.groups[patch_sw_version].keys():
for patch_sw_version in list(self.groups):
for ptype in list(self.groups[patch_sw_version]):
if patch_id in self.groups[patch_sw_version][ptype]:
del self.groups[patch_sw_version][ptype][patch_id]
@ -636,7 +643,7 @@ class PatchData(object):
fname = "%s/comps.xml" % output_dir
top = ElementTree.Element('comps')
if sw_version in self.groups:
for groupname in sorted(self.groups[sw_version].keys()):
for groupname in sorted(list(self.groups[sw_version])):
if self.groups[sw_version][groupname]:
group = ElementTree.SubElement(top, 'group')
@ -770,7 +777,7 @@ class PatchMetadata(object):
add_text_tag_to_xml(top, 'apply_active_release_only',
self.apply_active_release_only)
for groupname in sorted(self.groups.keys()):
for groupname in sorted(list(self.groups)):
if self.groups[groupname]:
group = ElementTree.SubElement(top,
'personality',
@ -780,7 +787,7 @@ class PatchMetadata(object):
add_text_tag_to_xml(group, 'package', pkg)
content = ElementTree.SubElement(top, 'contents')
for rpmname in sorted(self.contents.keys()):
for rpmname in sorted(list(self.contents)):
add_text_tag_to_xml(content, 'rpm', rpmname)
req = ElementTree.SubElement(top, 'requires')
@ -873,7 +880,7 @@ class PatchFile(object):
os.chdir(tmpdir)
# Copy RPM files to tmpdir
for rpmfile in self.rpmlist.keys():
for rpmfile in list(self.rpmlist):
shutil.copy(rpmfile, tmpdir)
# add file signatures to RPMs
@ -887,14 +894,14 @@ class PatchFile(object):
# generate tar file
tar = tarfile.open("software.tar", "w")
for rpmfile in self.rpmlist.keys():
for rpmfile in list(self.rpmlist):
tar.add(os.path.basename(rpmfile))
tar.close()
# Copy semantics to tmpdir, if any
if len(self.semantics) > 0:
tar = tarfile.open("semantics.tar", "w")
for action in self.semantics.keys():
for action in list(self.semantics):
os.mkdir(action, 0o755)
sname = os.path.join(action, self.meta.id)
shutil.copy(self.semantics[action], sname)

View File

@ -6,10 +6,10 @@ SPDX-License-Identifier: Apache-2.0
"""
import os
from Crypto.Signature import PKCS1_PSS
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA # pylint: disable=unused-import
from Crypto.Util.asn1 import DerSequence # pylint: disable=unused-import
from Cryptodome.Signature import PKCS1_PSS
from Cryptodome.Hash import SHA256
from Cryptodome.PublicKey import RSA # pylint: disable=unused-import
from Cryptodome.Util.asn1 import DerSequence # pylint: disable=unused-import
from binascii import a2b_base64 # pylint: disable=unused-import
from cgcs_patch.patch_verify import read_RSA_key
from cgcs_patch.patch_verify import cert_type_formal_str
@ -55,7 +55,7 @@ def sign_files(filenames, signature_file, private_key=None, cert_type=None):
if private_key is None:
if cert_type is not None:
# A Specific key is asked for
assert (cert_type in private_key_files.keys()), "cert_type=%s is not a known cert type" % cert_type
assert (cert_type in list(private_key_files)), "cert_type=%s is not a known cert type" % cert_type
dict_key = cert_type
filename = private_key_files[dict_key]
# print 'cert_type given: Checking to see if ' + filename + ' exists\n'

View File

@ -8,11 +8,11 @@ SPDX-License-Identifier: Apache-2.0
import os
import logging
from Crypto.Signature import PKCS1_v1_5
from Crypto.Signature import PKCS1_PSS
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Util.asn1 import DerSequence
from Cryptodome.Signature import PKCS1_v1_5
from Cryptodome.Signature import PKCS1_PSS
from Cryptodome.Hash import SHA256
from Cryptodome.PublicKey import RSA
from Cryptodome.Util.asn1 import DerSequence
from binascii import a2b_base64
from cgcs_patch.certificates import dev_certificate
@ -112,10 +112,10 @@ def get_public_certificates():
# encrypted with our formal private key. If the file is present (and valid)
# then we add the developer key to the approved certificates list
if os.path.exists(dev_certificate_marker):
with open(dev_certificate_marker) as infile:
with open(dev_certificate_marker, 'rb') as infile:
signature = infile.read()
data_hash = SHA256.new()
data_hash.update('Titanium patching')
data_hash.update(b'Titanium patching')
if verify_hash(data_hash, signature, cert_list):
cert_list.append(dev_certificate)
else:

View File

@ -5,5 +5,6 @@
keystonemiddleware
oslo_config
pecan
pycryptodome;python_version=='2.7'
pycryptodomex
lxml
requests_toolbelt