Merge "Debian: Patch current implementation"

This commit is contained in:
Zuul 2022-05-17 19:56:17 +00:00 committed by Gerrit Code Review
commit 36998dde90
6 changed files with 187 additions and 224 deletions

View File

@ -56,6 +56,8 @@ class PatchAPIController(object):
except PatchError as e:
return dict(error="Error: %s" % str(e))
pc.send_latest_feed_commit_to_agent()
pc.patch_sync()
return result
@ -71,6 +73,8 @@ class PatchAPIController(object):
except PatchError as e:
return dict(error="Error: %s" % str(e))
pc.send_latest_feed_commit_to_agent()
pc.patch_sync()
return result

View File

@ -19,6 +19,7 @@ PATCHMSG_QUERY_DETAILED_RESP = 8
PATCHMSG_AGENT_INSTALL_REQ = 9
PATCHMSG_AGENT_INSTALL_RESP = 10
PATCHMSG_DROP_HOST_REQ = 11
PATCHMSG_SEND_LATEST_FEED_COMMIT = 12
PATCHMSG_STR = {
PATCHMSG_UNKNOWN: "unknown",
@ -33,6 +34,7 @@ PATCHMSG_STR = {
PATCHMSG_AGENT_INSTALL_REQ: "agent-install-req",
PATCHMSG_AGENT_INSTALL_RESP: "agent-install-resp",
PATCHMSG_DROP_HOST_REQ: "drop-host-req",
PATCHMSG_SEND_LATEST_FEED_COMMIT: "send-latest-feed-commit",
}

View File

@ -4,9 +4,13 @@ Copyright (c) 2022 Wind River Systems, Inc.
SPDX-License-Identifier: Apache-2.0
"""
import logging
import subprocess
from cgcs_patch import constants
from cgcs_patch.exceptions import OSTreeCommandFail
LOG = logging.getLogger('main_logger')
def get_ostree_latest_commit(ostree_ref, repo_path):
@ -17,7 +21,7 @@ def get_ostree_latest_commit(ostree_ref, repo_path):
example: starlingx
:param repo_path: the path to the ostree repo:
example: /var/www/pages/feed/rel-22.06/ostree_repo
:return: a tuple of the most recent commit and checksum
:return: The most recent commit of the repo
"""
# Sample command and output that is parsed to get the commit and checksum
@ -39,16 +43,21 @@ def get_ostree_latest_commit(ostree_ref, repo_path):
# Commit-id: starlingx-intel-x86-64-20220428180512
cmd = "ostree log %s --repo=%s" % (ostree_ref, repo_path)
output = subprocess.run(cmd, shell=True, check=True, capture_output=True)
try:
output = subprocess.run(cmd, shell=True, check=True, capture_output=True)
except subprocess.CalledProcessError as e:
info_msg = "OSTree log Error: return code: %s , Output: %s" \
% (e.returncode, e.stderr.decode("utf-8"))
LOG.info(info_msg)
msg = "Failed to fetch ostree log for %s." % repo_path
raise OSTreeCommandFail(msg)
# Store the output of the above command in a string
output_string = output.stdout.decode('utf-8')
# Parse the string to get the latest commit and checksum for that ostree
# Parse the string to get the latest commit for the ostree
split_output_string = output_string.split()
latest_commit = split_output_string[1]
latest_checksum = split_output_string[3]
return (latest_commit, latest_checksum)
return latest_commit
def get_feed_latest_commit(patch_sw_version):
@ -57,8 +66,7 @@ def get_feed_latest_commit(patch_sw_version):
:param patch_sw_version: software version for the feed
example: 22.06
:return: a tuple of the most recent commit and checksum
for that feed
:return: The latest commit for the feed repo
"""
repo_path = "%s/rel-%s/ostree_repo" % (constants.FEED_OSTREE_BASE_DIR,
patch_sw_version)
@ -68,6 +76,47 @@ def get_feed_latest_commit(patch_sw_version):
def get_sysroot_latest_commit():
"""
Query ostree sysroot to determine the currently active commit
:return: a tuple of the commit and checksum for sysroot
:return: The latest commit for sysroot repo
"""
return get_ostree_latest_commit(constants.OSTREE_REF, constants.SYSROOT_OSTREE)
def get_latest_deployment_commit():
"""
Get the active deployment commit ID
:return: The commit ID associated with the active commit
"""
# Sample command and output that is parsed to get the active commit
# associated with the deployment
#
# Command: ostree admin status
#
# Output:
#
# debian 0658a62854647b89caf5c0e9ed6ff62a6c98363ada13701d0395991569248d7e.0 (pending)
# origin refspec: starlingx
# * debian a5d8f8ca9bbafa85161083e9ca2259ff21e5392b7595a67f3bc7e7ab8cb583d9.0
# Unlocked: hotfix
# origin refspec: starlingx
cmd = "ostree admin status"
try:
output = subprocess.run(cmd, shell=True, check=True, capture_output=True)
except subprocess.CalledProcessError as e:
msg = "Failed to fetch ostree admin status."
info_msg = "OSTree Admin Status Error: return code: %s , Output: %s" \
% (e.returncode, e.stderr.decode("utf-8"))
LOG.info(info_msg)
raise OSTreeCommandFail(msg)
# Store the output of the above command in a string
output_string = output.stdout.decode('utf-8')
# Parse the string to get the active commit on this deployment
# Trim everything before * as * represents the active deployment commit
trimmed_output_string = output_string[output_string.index("*"):]
split_output_string = trimmed_output_string.split()
active_deployment_commit = split_output_string[2]
return active_deployment_commit

View File

@ -89,6 +89,25 @@ def check_install_uuid():
return True
class PatchMessageSendLatestFeedCommit(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_SEND_LATEST_FEED_COMMIT)
def decode(self, data):
global pa
messages.PatchMessage.decode(self, data)
if 'latest_feed_commit' in data:
pa.latest_feed_commit = data['latest_feed_commit']
def encode(self):
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
global pa
# Check if the node is patch current
pa.query()
class PatchMessageHelloAgent(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_HELLO_AGENT)
@ -185,10 +204,7 @@ class PatchMessageQueryDetailedResp(messages.PatchMessage):
def encode(self):
global pa
messages.PatchMessage.encode(self)
self.message['installed'] = pa.installed
self.message['to_remove'] = pa.to_remove
self.message['missing_pkgs'] = pa.missing_pkgs
self.message['duplicated_pkgs'] = pa.duplicated_pkgs
self.message['latest_sysroot_commit'] = pa.latest_sysroot_commit
self.message['nodetype'] = cfg.nodetype
self.message['sw_version'] = SW_VERSION
self.message['subfunctions'] = subfunctions
@ -276,11 +292,8 @@ class PatchAgent(PatchService):
self.controller_address = None
self.listener = None
self.changes = False
self.installed = {}
self.to_install = {}
self.to_remove = []
self.missing_pkgs = []
self.duplicated_pkgs = {}
self.latest_feed_commit = None
self.latest_sysroot_commit = None
self.patch_op_counter = 0
self.node_is_patched = os.path.exists(node_is_patched_file)
self.node_is_patched_timestamp = 0
@ -324,63 +337,37 @@ class PatchAgent(PatchService):
self.listener.bind(('', self.port))
self.listener.listen(2) # Allow two connections, for two controllers
@staticmethod
def pkgobj_to_version_str(pkg):
# Transform pkgobj version to format used by patch-controller
if pkg.epoch != 0:
output = "%s:%s-%s@%s" % (pkg.epoch, pkg.version, pkg.release, pkg.arch)
else:
output = "%s-%s@%s" % (pkg.version, pkg.release, pkg.arch)
return output
@staticmethod
def pkgobjs_to_list(pkgobjs):
# Transform pkgobj list to format used by patch-controller
output = {}
for pkg in pkgobjs:
output[pkg.name] = PatchAgent.pkgobj_to_version_str(pkg)
return output
def getOSTreeRevision(self, feed_type):
""" Get the ostree revision for a particular feed """
# todo(abailey): is a software version parameter required to support upgrade?
# Probably this method will invoke something like:
# ostree pull --commit-metadata-only --depth=1
LOG.info("Querying OSTree Revision from %s", feed_type)
return "UNDER CONSTRUCTION"
def query(self, check_revision=False):
def query(self):
""" Check current patch state """
if not check_install_uuid():
LOG.info("Failed install_uuid check. Skipping query")
return False
current_repo_revision = self.getOSTreeRevision('platform-updates')
if check_revision:
# The revision is a SHA for ostree
# todo(jcasteli): do we need check_revision
# since there is no caching or retry code
LOG.info("repo revision id: %s", current_repo_revision)
self.last_repo_revision = current_repo_revision
# Generate a unique query id
self.query_id = random.random()
# determine OSTREE state of the system and the patches
self.changes = False
active_commit, active_checksum = ostree_utils.get_sysroot_latest_commit()
patch_commit, patch_checksum = ostree_utils.get_feed_latest_commit(SW_VERSION)
active_sysroot_commit = ostree_utils.get_sysroot_latest_commit()
self.latest_sysroot_commit = active_sysroot_commit
self.last_repo_revision = active_sysroot_commit
if active_commit != patch_commit:
LOG.info("Active Commit:%s does not match Patch Commit %s", active_commit, patch_commit)
self.changes = True
# latest_feed_commit is sent from patch controller
if self.latest_feed_commit:
if active_sysroot_commit != self.latest_feed_commit:
LOG.info("Active Sysroot Commit:%s does not match "
"active controller's Feed Repo Commit: %s",
active_sysroot_commit, self.latest_feed_commit)
self.changes = True
active_deployment_commit = ostree_utils.get_latest_deployment_commit()
# strip off anything after a period. ex: 1234.1 becomes 1234
active_deployment_commit = active_deployment_commit.split(".")[0]
if active_checksum != patch_checksum:
LOG.info("Active Checksum:%s does not match Patch Checksum%s", active_checksum, patch_checksum)
if active_sysroot_commit != active_deployment_commit:
LOG.info("Active Sysroot Commit:%s does not match "
"Active Deployment Commit: %s",
active_sysroot_commit, active_deployment_commit)
self.changes = True
return True
@ -432,18 +419,35 @@ class PatchAgent(PatchService):
hello_ack = PatchMessageHelloAgentAck()
hello_ack.send(self.sock_out)
# Build up the install set
if verbose_to_stdout:
print("Checking for software updates...")
self.query() # sets self.changes
changed = False
sysroot_ostree = constants.SYSROOT_OSTREE
feed_ostree = "%s/rel-%s/ostree_repo" % (constants.FEED_OSTREE_BASE_DIR, SW_VERSION)
cmd = "ostree --repo=%s pull-local %s %s --depth=-1" % (sysroot_ostree, feed_ostree, constants.OSTREE_REF)
try:
subprocess.run(cmd, shell=True, check=True, capture_output=True)
changed = True
except subprocess.CalledProcessError as e:
LOG.exception("Failed to pull feed ostree in to the sysroot ostree.")
info_msg = "OSTree Reset Error: return code: %s , Output: %s" % (e.returncode, e.stderr.decode("utf-8"))
LOG.info(info_msg)
if self.changes:
cmd = "ostree --repo=%s pull-local %s %s --depth=-1" % (sysroot_ostree, feed_ostree, constants.OSTREE_REF)
try:
subprocess.run(cmd, shell=True, check=True, capture_output=True)
except subprocess.CalledProcessError as e:
LOG.exception("Failed to pull feed ostree in to the sysroot ostree.")
info_msg = "OSTree Pull Local Error: return code: %s , Output: %s" % (e.returncode, e.stderr.decode("utf-8"))
LOG.info(info_msg)
deployment_cmd = "ostree admin deploy %s" % constants.OSTREE_REF
try:
subprocess.run(deployment_cmd, shell=True, check=True, capture_output=True)
changed = True
except subprocess.CalledProcessError as e:
LOG.exception("Failed to create an ostree deployment.")
info_msg = "OSTree Deployment Error: return code: %s , Output: %s" % (e.returncode, e.stderr.decode("utf-8"))
LOG.info(info_msg)
self.query() # sets self.changes.. Should now be false.
if self.changes:
LOG.info("Installing the patch did not change the patch current status")
if changed:
# Update the node_is_patched flag
@ -522,7 +526,7 @@ class PatchAgent(PatchService):
changed = True
if changed:
rc = self.query(check_revision=True)
rc = self.query()
if not rc:
# Query failed. Reset the op counter
self.patch_op_counter = 0
@ -630,6 +634,8 @@ class PatchAgent(PatchService):
msg = PatchMessageHelloAgent()
elif msgdata['msgtype'] == messages.PATCHMSG_QUERY_DETAILED:
msg = PatchMessageQueryDetailed()
elif msgdata['msgtype'] == messages.PATCHMSG_SEND_LATEST_FEED_COMMIT:
msg = PatchMessageSendLatestFeedCommit()
elif msgdata['msgtype'] == messages.PATCHMSG_AGENT_INSTALL_REQ:
msg = PatchMessageAgentInstallReq()

View File

@ -112,10 +112,7 @@ class AgentNeighbour(object):
self.patch_failed = False
self.stale = False
self.pending_query = False
self.installed = {}
self.to_remove = []
self.missing_pkgs = []
self.duplicated_pkgs = {}
self.latest_sysroot_commit = None
self.nodetype = None
self.sw_version = "unknown"
self.subfunctions = []
@ -154,18 +151,12 @@ class AgentNeighbour(object):
return int(time.time() - self.last_ack)
def handle_query_detailed_resp(self,
installed,
to_remove,
missing_pkgs,
duplicated_pkgs,
latest_sysroot_commit,
nodetype,
sw_version,
subfunctions,
state):
self.installed = installed
self.to_remove = to_remove
self.missing_pkgs = missing_pkgs
self.duplicated_pkgs = duplicated_pkgs
self.latest_sysroot_commit = latest_sysroot_commit
self.nodetype = nodetype
self.stale = False
self.pending_query = False
@ -186,10 +177,7 @@ class AgentNeighbour(object):
"secs_since_ack": self.get_age(),
"patch_failed": self.patch_failed,
"stale_details": self.stale,
"installed": self.installed,
"to_remove": self.to_remove,
"missing_pkgs": self.missing_pkgs,
"duplicated_pkgs": self.duplicated_pkgs,
"latest_sysroot_commit": self.latest_sysroot_commit,
"nodetype": self.nodetype,
"subfunctions": self.subfunctions,
"sw_version": self.sw_version,
@ -348,6 +336,27 @@ class PatchMessageHelloAgent(messages.PatchMessage):
sock.sendto(str.encode(message), (local_hostname, cfg.agent_port))
class PatchMessageSendLatestFeedCommit(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_SEND_LATEST_FEED_COMMIT)
def encode(self):
global pc
messages.PatchMessage.encode(self)
self.message['latest_feed_commit'] = pc.latest_feed_commit
def handle(self, sock, addr):
LOG.error("Should not get here")
def send(self, sock):
global pc
self.encode()
message = json.dumps(self.message)
local_hostname = utils.ip_to_versioned_localhost(cfg.agent_mcast_group)
sock.sendto(str.encode(message), (pc.agent_address, cfg.agent_port))
sock.sendto(str.encode(message), (local_hostname, cfg.agent_port))
class PatchMessageHelloAgentAck(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_HELLO_AGENT_ACK)
@ -421,11 +430,7 @@ class PatchMessageQueryDetailedResp(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_QUERY_DETAILED_RESP)
self.agent_sw_version = "unknown"
self.installed = {}
self.to_install = {}
self.to_remove = []
self.missing_pkgs = []
self.duplicated_pkgs = {}
self.latest_sysroot_commit = "unknown"
self.subfunctions = []
self.nodetype = "unknown"
self.agent_sw_version = "unknown"
@ -433,14 +438,8 @@ class PatchMessageQueryDetailedResp(messages.PatchMessage):
def decode(self, data):
messages.PatchMessage.decode(self, data)
if 'installed' in data:
self.installed = data['installed']
if 'to_remove' in data:
self.to_remove = data['to_remove']
if 'missing_pkgs' in data:
self.missing_pkgs = data['missing_pkgs']
if 'duplicated_pkgs' in data:
self.duplicated_pkgs = data['duplicated_pkgs']
if 'latest_sysroot_commit' in data:
self.latest_sysroot_commit = data['latest_sysroot_commit']
if 'nodetype' in data:
self.nodetype = data['nodetype']
if 'sw_version' in data:
@ -459,10 +458,7 @@ class PatchMessageQueryDetailedResp(messages.PatchMessage):
ip = addr[0]
pc.hosts_lock.acquire()
if ip in pc.hosts:
pc.hosts[ip].handle_query_detailed_resp(self.installed,
self.to_remove,
self.missing_pkgs,
self.duplicated_pkgs,
pc.hosts[ip].handle_query_detailed_resp(self.latest_sysroot_commit,
self.nodetype,
self.agent_sw_version,
self.subfunctions,
@ -600,6 +596,7 @@ class PatchController(PatchService):
self.patch_op_counter = 1
self.patch_data = PatchData()
self.patch_data.load_all()
self.latest_feed_commit = None
self.check_patch_states()
self.base_pkgdata = BasePackageData()
@ -757,106 +754,12 @@ class PatchController(PatchService):
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_REMOVE
elif self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED:
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_APPLY
if self.patch_data.metadata[patch_id].get("reboot_required") != "N":
self.allow_insvc_patching = False
else:
self.patch_data.metadata[patch_id]["patchstate"] = \
self.patch_data.metadata[patch_id]["repostate"]
for ip in list(self.hosts):
if not self.hosts[ip].out_of_date:
continue
# todo(jcasteli): the patch contents checks can be revisited
for pkg in list(self.hosts[ip].installed):
for patch_id in list(self.patch_data.contents):
if patch_id not in self.patch_data.metadata:
LOG.error("Patch data missing for %s", patch_id)
continue
# If the patch is on a different release than the host, skip it.
if self.patch_data.metadata[patch_id]["sw_version"] != self.hosts[ip].sw_version:
continue
# Is the installed pkg higher or lower version?
# OSTREE: this comparison will be different than for RPMs
installed_ver = self.hosts[ip].installed[pkg].split('@')[0]
if ":" in installed_ver:
# Ignore epoch
installed_ver = installed_ver.split(':')[1]
if self.patch_data.metadata[patch_id]["repostate"] == constants.AVAILABLE:
# The RPM is not expected to be installed.
# If the installed version is the same or higher,
# this patch is in a Partial-Remove state
if patch_id in self.interim_state:
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_REMOVE
if self.patch_data.metadata[patch_id].get("reboot_required") != "N":
self.allow_insvc_patching = False
continue
elif self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED:
# The RPM is expected to be installed.
# If the installed version is the lower,
# this patch is in a Partial-Apply state
if patch_id in self.interim_state:
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_APPLY
if self.patch_data.metadata[patch_id].get("reboot_required") != "N":
self.allow_insvc_patching = False
continue
# todo(jcasteli): patching no longer needs to verify personality
if self.hosts[ip].sw_version == "14.10":
# For Release 1
personality = "personality-%s" % self.hosts[ip].nodetype
else:
personality = "personality-%s" % "-".join(self.hosts[ip].subfunctions)
# Check the to_remove list
for pkg in self.hosts[ip].to_remove:
for patch_id in list(self.patch_data.contents):
if pkg not in self.patch_data.contents[patch_id]:
continue
if patch_id not in self.patch_data.metadata:
LOG.error("Patch data missing for %s", patch_id)
continue
if personality not in self.patch_data.metadata[patch_id]:
continue
if pkg not in self.patch_data.metadata[patch_id][personality]:
continue
if self.patch_data.metadata[patch_id]["repostate"] == constants.AVAILABLE:
# The RPM is not expected to be installed.
# This patch is in a Partial-Remove state
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_REMOVE
if self.patch_data.metadata[patch_id].get("reboot_required") != "N":
self.allow_insvc_patching = False
continue
# Check the missing_pkgs list
for pkg in self.hosts[ip].missing_pkgs:
for patch_id in list(self.patch_data.contents):
if pkg not in self.patch_data.contents[patch_id]:
continue
if patch_id not in self.patch_data.metadata:
LOG.error("Patch data missing for %s", patch_id)
continue
if personality not in self.patch_data.metadata[patch_id]:
continue
if pkg not in self.patch_data.metadata[patch_id][personality]:
continue
if self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED:
# The RPM is expected to be installed.
# This patch is in a Partial-Apply state
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_APPLY
if self.patch_data.metadata[patch_id].get("reboot_required") != "N":
self.allow_insvc_patching = False
continue
self.hosts_lock.release()
def get_store_filename(self, patch_sw_version, contentname):
@ -1226,6 +1129,10 @@ class PatchController(PatchService):
else:
self.patch_data.metadata[patch_id]["patchstate"] = constants.UNKNOWN
# Commit1 in patch metadata.xml file represents the latest commit
# after this patch has been applied to the feed repo
self.latest_feed_commit = self.patch_data.contents[patch_id]["commit1"]
self.hosts_lock.acquire()
self.interim_state[patch_id] = list(self.hosts)
self.hosts_lock.release()
@ -1390,6 +1297,10 @@ class PatchController(PatchService):
else:
self.patch_data.metadata[patch_id]["patchstate"] = constants.UNKNOWN
# Base Commit in patch metadata.xml file represents the latest commit
# after this patch has been removed from the feed repo
self.latest_feed_commit = self.patch_data.contents[patch_id]["base"]["commit"]
self.hosts_lock.acquire()
self.interim_state[patch_id] = list(self.hosts)
self.hosts_lock.release()
@ -1672,6 +1583,16 @@ class PatchController(PatchService):
return dict(info=msg_info, warning=msg_warning, error=msg_error)
def send_latest_feed_commit_to_agent(self):
"""
Notify the patch agent that the latest commit on the feed
repo has been updated
"""
send_commit_to_agent = PatchMessageSendLatestFeedCommit()
self.socket_lock.acquire()
send_commit_to_agent.send(self.sock_out)
self.socket_lock.release()
def patch_sync(self):
# Increment the patch_op_counter here
self.inc_patch_op_counter()

View File

@ -28,30 +28,11 @@ ${hostelem(host)}
${h["ip"]}
% endif
</ip>
<missing_pkgs>
% if "missing_pkgs" in h and len(h["missing_pkgs"]) > 0:
% for pkg in sorted(h["missing_pkgs"]):
<pkg>${pkg}</pkg>
% endfor
<latest_sysroot_commit>
% if h["latest_sysroot_commit"] != "":
${h["latest_sysroot_commit"]}
% endif
</missing_pkgs>
<installed>
% if "installed" in h and len(h["installed"]) > 0:
% for pkg in sorted(h["installed"]):
<pkg>
<name>${pkg}</name>
<pkgname>${h["installed"][pkg]}</pkgname>
</pkg>
% endfor
% endif
</installed>
<to_remove>
% if "to_remove" in h and len(h["to_remove"]) > 0:
% for pkg in sorted(h["to_remove"]):
<pkg>${pkg}</pkg>
% endfor
% endif
</to_remove>
</latest_sysroot_commit>
<secs_since_ack>
% if h["secs_since_ack"] != "":
${h["secs_since_ack"]}
@ -72,4 +53,4 @@ ${hostelem(host)}
${h["patch_current"]}
% endif
</patch_current>
</host></%def>
</host></%def>