""" Copyright (c) 2014-2019 Wind River Systems, Inc. SPDX-License-Identifier: Apache-2.0 """ import shutil import tempfile import threading import time import socket import json import select import subprocess import six from six.moves import configparser import rpm import os import gc from cgcs_patch.patch_functions import parse_pkgver from wsgiref import simple_server from cgcs_patch.api import app from cgcs_patch.authapi import app as auth_app from cgcs_patch.patch_functions import configure_logging from cgcs_patch.patch_functions import BasePackageData from cgcs_patch.patch_functions import avail_dir from cgcs_patch.patch_functions import applied_dir from cgcs_patch.patch_functions import committed_dir from cgcs_patch.patch_functions import PatchFile from cgcs_patch.patch_functions import parse_rpm_filename from cgcs_patch.patch_functions import package_dir from cgcs_patch.patch_functions import repo_dir from cgcs_patch.patch_functions import semantics_dir from cgcs_patch.patch_functions import SW_VERSION from cgcs_patch.patch_functions import root_package_dir from cgcs_patch.exceptions import MetadataFail from cgcs_patch.exceptions import RpmFail from cgcs_patch.exceptions import SemanticFail from cgcs_patch.exceptions import PatchError from cgcs_patch.exceptions import PatchFail from cgcs_patch.exceptions import PatchInvalidRequest from cgcs_patch.exceptions import PatchValidationFailure from cgcs_patch.exceptions import PatchMismatchFailure from cgcs_patch.patch_functions import LOG from cgcs_patch.patch_functions import audit_log_info from cgcs_patch.patch_functions import patch_dir from cgcs_patch.patch_functions import repo_root_dir from cgcs_patch.patch_functions import PatchData from cgcs_patch.base import PatchService import cgcs_patch.config as cfg import cgcs_patch.utils as utils # noinspection PyUnresolvedReferences from oslo_config import cfg as oslo_cfg import cgcs_patch.messages as messages import cgcs_patch.constants as constants from tsconfig.tsconfig import INITIAL_CONFIG_COMPLETE_FLAG CONF = oslo_cfg.CONF pidfile_path = "/var/run/patch_controller.pid" pc = None state_file = "%s/.controller.state" % constants.PATCH_STORAGE_DIR app_dependency_basename = "app_dependencies.json" app_dependency_filename = "%s/%s" % (constants.PATCH_STORAGE_DIR, app_dependency_basename) insvc_patch_restart_controller = "/run/patching/.restart.patch-controller" stale_hosts = [] pending_queries = [] thread_death = None keep_running = True # Limit socket blocking to 5 seconds to allow for thread to shutdown api_socket_timeout = 5.0 class ControllerNeighbour(object): def __init__(self): self.last_ack = 0 self.synced = False def rx_ack(self): self.last_ack = time.time() def get_age(self): return int(time.time() - self.last_ack) def rx_synced(self): self.synced = True def clear_synced(self): self.synced = False def get_synced(self): return self.synced class AgentNeighbour(object): def __init__(self, ip): self.ip = ip self.last_ack = 0 self.last_query_id = 0 self.out_of_date = False self.hostname = "n/a" self.requires_reboot = False self.patch_failed = False self.stale = False self.pending_query = False self.installed = {} self.to_remove = [] self.missing_pkgs = [] self.duplicated_pkgs = {} self.nodetype = None self.sw_version = "unknown" self.subfunctions = [] self.state = None def rx_ack(self, hostname, out_of_date, requires_reboot, query_id, patch_failed, sw_version, state): self.last_ack = time.time() self.hostname = hostname self.patch_failed = patch_failed self.sw_version = sw_version self.state = state if out_of_date != self.out_of_date or requires_reboot != self.requires_reboot: self.out_of_date = out_of_date self.requires_reboot = requires_reboot LOG.info("Agent %s (%s) reporting out_of_date=%s, requires_reboot=%s", self.hostname, self.ip, self.out_of_date, self.requires_reboot) if self.last_query_id != query_id: self.last_query_id = query_id self.stale = True if self.ip not in stale_hosts and self.ip not in pending_queries: stale_hosts.append(self.ip) def get_age(self): return int(time.time() - self.last_ack) def handle_query_detailed_resp(self, installed, to_remove, missing_pkgs, duplicated_pkgs, nodetype, sw_version, subfunctions, state): self.installed = installed self.to_remove = to_remove self.missing_pkgs = missing_pkgs self.duplicated_pkgs = duplicated_pkgs self.nodetype = nodetype self.stale = False self.pending_query = False self.sw_version = sw_version self.subfunctions = subfunctions self.state = state if self.ip in pending_queries: pending_queries.remove(self.ip) if self.ip in stale_hosts: stale_hosts.remove(self.ip) def get_dict(self): d = {"ip": self.ip, "hostname": self.hostname, "patch_current": not self.out_of_date, "secs_since_ack": self.get_age(), "patch_failed": self.patch_failed, "stale_details": self.stale, "installed": self.installed, "to_remove": self.to_remove, "missing_pkgs": self.missing_pkgs, "duplicated_pkgs": self.duplicated_pkgs, "nodetype": self.nodetype, "subfunctions": self.subfunctions, "sw_version": self.sw_version, "state": self.state} global pc if self.out_of_date and not pc.allow_insvc_patching: d["requires_reboot"] = True else: d["requires_reboot"] = self.requires_reboot # Included for future enhancement, to allow per-node determination # of in-service patching d["allow_insvc_patching"] = pc.allow_insvc_patching return d class PatchMessageHello(messages.PatchMessage): def __init__(self): messages.PatchMessage.__init__(self, messages.PATCHMSG_HELLO) self.patch_op_counter = 0 def decode(self, data): messages.PatchMessage.decode(self, data) if 'patch_op_counter' in data: self.patch_op_counter = data['patch_op_counter'] def encode(self): global pc messages.PatchMessage.encode(self) self.message['patch_op_counter'] = pc.patch_op_counter def handle(self, sock, addr): global pc host = addr[0] if host == cfg.get_mgmt_ip(): # Ignore messages from self return # Send response if self.patch_op_counter > 0: pc.handle_nbr_patch_op_counter(host, self.patch_op_counter) resp = PatchMessageHelloAck() resp.send(sock) def send(self, sock): global pc self.encode() message = json.dumps(self.message) sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port)) class PatchMessageHelloAck(messages.PatchMessage): def __init__(self): messages.PatchMessage.__init__(self, messages.PATCHMSG_HELLO_ACK) def encode(self): # Nothing to add, so just call the super class messages.PatchMessage.encode(self) def handle(self, sock, addr): global pc pc.controller_neighbours_lock.acquire() if not addr[0] in pc.controller_neighbours: pc.controller_neighbours[addr[0]] = ControllerNeighbour() pc.controller_neighbours[addr[0]].rx_ack() pc.controller_neighbours_lock.release() def send(self, sock): global pc self.encode() message = json.dumps(self.message) sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port)) class PatchMessageSyncReq(messages.PatchMessage): def __init__(self): messages.PatchMessage.__init__(self, messages.PATCHMSG_SYNC_REQ) def encode(self): # Nothing to add to the SYNC_REQ, so just call the super class messages.PatchMessage.encode(self) def handle(self, sock, addr): global pc host = addr[0] if host == cfg.get_mgmt_ip(): # Ignore messages from self return # We may need to do this in a separate thread, so that we continue to process hellos LOG.info("Handling sync req") pc.sync_from_nbr(host) resp = PatchMessageSyncComplete() resp.send(sock) def send(self, sock): global pc LOG.info("sending sync req") self.encode() message = json.dumps(self.message) sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port)) class PatchMessageSyncComplete(messages.PatchMessage): def __init__(self): messages.PatchMessage.__init__(self, messages.PATCHMSG_SYNC_COMPLETE) def encode(self): # Nothing to add to the SYNC_COMPLETE, so just call the super class messages.PatchMessage.encode(self) def handle(self, sock, addr): global pc LOG.info("Handling sync complete") pc.controller_neighbours_lock.acquire() if not addr[0] in pc.controller_neighbours: pc.controller_neighbours[addr[0]] = ControllerNeighbour() pc.controller_neighbours[addr[0]].rx_synced() pc.controller_neighbours_lock.release() def send(self, sock): global pc LOG.info("sending sync complete") self.encode() message = json.dumps(self.message) sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port)) class PatchMessageHelloAgent(messages.PatchMessage): def __init__(self): messages.PatchMessage.__init__(self, messages.PATCHMSG_HELLO_AGENT) def encode(self): global pc messages.PatchMessage.encode(self) self.message['patch_op_counter'] = pc.patch_op_counter def handle(self, sock, addr): LOG.error("Should not get here") def send(self, sock): global pc self.encode() message = json.dumps(self.message) local_hostname = utils.ip_to_versioned_localhost(cfg.agent_mcast_group) sock.sendto(str.encode(message), (pc.agent_address, cfg.agent_port)) sock.sendto(str.encode(message), (local_hostname, cfg.agent_port)) class PatchMessageHelloAgentAck(messages.PatchMessage): def __init__(self): messages.PatchMessage.__init__(self, messages.PATCHMSG_HELLO_AGENT_ACK) self.query_id = 0 self.agent_out_of_date = False self.agent_hostname = "n/a" self.agent_requires_reboot = False self.agent_patch_failed = False self.agent_sw_version = "unknown" self.agent_state = "unknown" def decode(self, data): messages.PatchMessage.decode(self, data) if 'query_id' in data: self.query_id = data['query_id'] if 'out_of_date' in data: self.agent_out_of_date = data['out_of_date'] if 'hostname' in data: self.agent_hostname = data['hostname'] if 'requires_reboot' in data: self.agent_requires_reboot = data['requires_reboot'] if 'patch_failed' in data: self.agent_patch_failed = data['patch_failed'] if 'sw_version' in data: self.agent_sw_version = data['sw_version'] if 'state' in data: self.agent_state = data['state'] def encode(self): # Nothing to add, so just call the super class messages.PatchMessage.encode(self) def handle(self, sock, addr): global pc pc.hosts_lock.acquire() if not addr[0] in pc.hosts: pc.hosts[addr[0]] = AgentNeighbour(addr[0]) pc.hosts[addr[0]].rx_ack(self.agent_hostname, self.agent_out_of_date, self.agent_requires_reboot, self.query_id, self.agent_patch_failed, self.agent_sw_version, self.agent_state) pc.hosts_lock.release() def send(self, sock): # pylint: disable=unused-argument LOG.error("Should not get here") class PatchMessageQueryDetailed(messages.PatchMessage): def __init__(self): messages.PatchMessage.__init__(self, messages.PATCHMSG_QUERY_DETAILED) def encode(self): # Nothing to add to the message, so just call the super class messages.PatchMessage.encode(self) def handle(self, sock, addr): LOG.error("Should not get here") def send(self, sock): self.encode() message = json.dumps(self.message) sock.sendall(str.encode(message)) class PatchMessageQueryDetailedResp(messages.PatchMessage): def __init__(self): messages.PatchMessage.__init__(self, messages.PATCHMSG_QUERY_DETAILED_RESP) self.agent_sw_version = "unknown" self.installed = {} self.to_install = {} self.to_remove = [] self.missing_pkgs = [] self.duplicated_pkgs = {} self.subfunctions = [] self.nodetype = "unknown" self.agent_sw_version = "unknown" self.agent_state = "unknown" def decode(self, data): messages.PatchMessage.decode(self, data) if 'installed' in data: self.installed = data['installed'] if 'to_remove' in data: self.to_remove = data['to_remove'] if 'missing_pkgs' in data: self.missing_pkgs = data['missing_pkgs'] if 'duplicated_pkgs' in data: self.duplicated_pkgs = data['duplicated_pkgs'] if 'nodetype' in data: self.nodetype = data['nodetype'] if 'sw_version' in data: self.agent_sw_version = data['sw_version'] if 'subfunctions' in data: self.subfunctions = data['subfunctions'] if 'state' in data: self.agent_state = data['state'] def encode(self): LOG.error("Should not get here") def handle(self, sock, addr): global pc ip = addr[0] pc.hosts_lock.acquire() if ip in pc.hosts: pc.hosts[ip].handle_query_detailed_resp(self.installed, self.to_remove, self.missing_pkgs, self.duplicated_pkgs, self.nodetype, self.agent_sw_version, self.subfunctions, self.agent_state) for patch_id in list(pc.interim_state): if ip in pc.interim_state[patch_id]: pc.interim_state[patch_id].remove(ip) if len(pc.interim_state[patch_id]) == 0: del pc.interim_state[patch_id] pc.hosts_lock.release() pc.check_patch_states() else: pc.hosts_lock.release() def send(self, sock): # pylint: disable=unused-argument LOG.error("Should not get here") class PatchMessageAgentInstallReq(messages.PatchMessage): def __init__(self): messages.PatchMessage.__init__(self, messages.PATCHMSG_AGENT_INSTALL_REQ) self.ip = None self.force = False def encode(self): global pc messages.PatchMessage.encode(self) self.message['force'] = self.force def handle(self, sock, addr): LOG.error("Should not get here") def send(self, sock): LOG.info("sending install request to node: %s", self.ip) self.encode() message = json.dumps(self.message) sock.sendto(str.encode(message), (self.ip, cfg.agent_port)) class PatchMessageAgentInstallResp(messages.PatchMessage): def __init__(self): messages.PatchMessage.__init__(self, messages.PATCHMSG_AGENT_INSTALL_RESP) self.status = False self.reject_reason = None def decode(self, data): messages.PatchMessage.decode(self, data) if 'status' in data: self.status = data['status'] if 'reject_reason' in data: self.reject_reason = data['reject_reason'] def encode(self): # Nothing to add, so just call the super class messages.PatchMessage.encode(self) def handle(self, sock, addr): LOG.info("Handling install resp from %s", addr[0]) global pc # LOG.info("Handling hello ack") pc.hosts_lock.acquire() if not addr[0] in pc.hosts: pc.hosts[addr[0]] = AgentNeighbour(addr[0]) pc.hosts[addr[0]].install_status = self.status pc.hosts[addr[0]].install_pending = False pc.hosts[addr[0]].install_reject_reason = self.reject_reason pc.hosts_lock.release() def send(self, sock): # pylint: disable=unused-argument LOG.error("Should not get here") class PatchMessageDropHostReq(messages.PatchMessage): def __init__(self): messages.PatchMessage.__init__(self, messages.PATCHMSG_DROP_HOST_REQ) self.ip = None def encode(self): messages.PatchMessage.encode(self) self.message['ip'] = self.ip def decode(self, data): messages.PatchMessage.decode(self, data) if 'ip' in data: self.ip = data['ip'] def handle(self, sock, addr): global pc host = addr[0] if host == cfg.get_mgmt_ip(): # Ignore messages from self return if self.ip is None: LOG.error("Received PATCHMSG_DROP_HOST_REQ with no ip: %s", json.dumps(self.data)) return pc.drop_host(self.ip, sync_nbr=False) return def send(self, sock): global pc self.encode() message = json.dumps(self.message) sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port)) class PatchController(PatchService): def __init__(self): PatchService.__init__(self) # Locks self.socket_lock = threading.RLock() self.controller_neighbours_lock = threading.RLock() self.hosts_lock = threading.RLock() self.patch_data_lock = threading.RLock() self.hosts = {} self.controller_neighbours = {} # interim_state is used to track hosts that have not responded # with fresh queries since a patch was applied or removed, on # a per-patch basis. This allows the patch controller to move # patches immediately into a "Partial" state until all nodes # have responded. # self.interim_state = {} self.sock_out = None self.sock_in = None self.controller_address = None self.agent_address = None self.patch_op_counter = 1 self.patch_data = PatchData() self.patch_data.load_all() self.check_patch_states() self.base_pkgdata = BasePackageData() self.allow_insvc_patching = True if os.path.exists(app_dependency_filename): try: with open(app_dependency_filename, 'r') as f: self.app_dependencies = json.loads(f.read()) except Exception: LOG.exception("Failed to read app dependencies: %s", app_dependency_filename) else: self.app_dependencies = {} if os.path.isfile(state_file): self.read_state_file() else: self.write_state_file() def update_config(self): cfg.read_config() if self.port != cfg.controller_port: self.port = cfg.controller_port # Loopback interface does not support multicast messaging, therefore # revert to using unicast messaging when configured against the # loopback device if cfg.get_mgmt_iface() == constants.LOOPBACK_INTERFACE_NAME: mgmt_ip = cfg.get_mgmt_ip() self.mcast_addr = None self.controller_address = mgmt_ip self.agent_address = mgmt_ip else: self.mcast_addr = cfg.controller_mcast_group self.controller_address = cfg.controller_mcast_group self.agent_address = cfg.agent_mcast_group def socket_lock_acquire(self): self.socket_lock.acquire() def socket_lock_release(self): try: self.socket_lock.release() except Exception: pass def write_state_file(self): if six.PY2: config = configparser.ConfigParser() elif six.PY3: config = configparser.ConfigParser(strict=False) cfgfile = open(state_file, 'w') config.add_section('runtime') config.set('runtime', 'patch_op_counter', str(self.patch_op_counter)) config.write(cfgfile) cfgfile.close() def read_state_file(self): if six.PY2: config = configparser.ConfigParser() elif six.PY3: config = configparser.ConfigParser(strict=False) config.read(state_file) try: counter = config.getint('runtime', 'patch_op_counter') self.patch_op_counter = counter LOG.info("patch_op_counter is: %d", self.patch_op_counter) except configparser.Error: LOG.exception("Failed to read state info") def handle_nbr_patch_op_counter(self, host, nbr_patch_op_counter): if self.patch_op_counter >= nbr_patch_op_counter: return self.sync_from_nbr(host) def sync_from_nbr(self, host): # Sync the patching repo host_url = utils.ip_to_url(host) try: output = subprocess.check_output(["rsync", "-acv", "--delete", "--exclude", "tmp", "rsync://%s/patching/" % host_url, "%s/" % patch_dir], stderr=subprocess.STDOUT) LOG.info("Synced to mate patching via rsync: %s", output) except subprocess.CalledProcessError as e: LOG.error("Failed to rsync: %s", e.output) return False try: output = subprocess.check_output(["rsync", "-acv", "--delete", "rsync://%s/repo/" % host_url, "%s/" % repo_root_dir], stderr=subprocess.STDOUT) LOG.info("Synced to mate repo via rsync: %s", output) except subprocess.CalledProcessError: LOG.error("Failed to rsync: %s", output) return False self.read_state_file() self.patch_data_lock.acquire() self.hosts_lock.acquire() self.interim_state = {} self.patch_data.load_all() self.check_patch_states() self.hosts_lock.release() if os.path.exists(app_dependency_filename): try: with open(app_dependency_filename, 'r') as f: self.app_dependencies = json.loads(f.read()) except Exception: LOG.exception("Failed to read app dependencies: %s", app_dependency_filename) else: self.app_dependencies = {} self.patch_data_lock.release() return True def inc_patch_op_counter(self): self.patch_op_counter += 1 self.write_state_file() def check_patch_states(self): # If we have no hosts, we can't be sure of the current patch state if len(self.hosts) == 0: for patch_id in self.patch_data.metadata: self.patch_data.metadata[patch_id]["patchstate"] = constants.UNKNOWN return # Default to allowing in-service patching self.allow_insvc_patching = True # Take the detailed query results from the hosts and merge with the patch data self.hosts_lock.acquire() # Initialize patch state data based on repo state and interim_state presence for patch_id in self.patch_data.metadata: if patch_id in self.interim_state: if self.patch_data.metadata[patch_id]["repostate"] == constants.AVAILABLE: self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_REMOVE elif self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED: self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_APPLY else: self.patch_data.metadata[patch_id]["patchstate"] = \ self.patch_data.metadata[patch_id]["repostate"] for ip in list(self.hosts): if not self.hosts[ip].out_of_date: continue for pkg in list(self.hosts[ip].installed): for patch_id in list(self.patch_data.content_versions): if pkg not in self.patch_data.content_versions[patch_id]: continue if patch_id not in self.patch_data.metadata: LOG.error("Patch data missing for %s", patch_id) continue # If the patch is on a different release than the host, skip it. if self.patch_data.metadata[patch_id]["sw_version"] != self.hosts[ip].sw_version: continue # Is the installed pkg higher or lower version? # The rpm.labelCompare takes version broken into 3 components installed_ver = self.hosts[ip].installed[pkg].split('@')[0] if ":" in installed_ver: # Ignore epoch installed_ver = installed_ver.split(':')[1] patch_ver = self.patch_data.content_versions[patch_id][pkg] if ":" in patch_ver: # Ignore epoch patch_ver = patch_ver.split(':')[1] rc = rpm.labelCompare(parse_pkgver(installed_ver), parse_pkgver(patch_ver)) if self.patch_data.metadata[patch_id]["repostate"] == constants.AVAILABLE: # The RPM is not expected to be installed. # If the installed version is the same or higher, # this patch is in a Partial-Remove state if rc >= 0 or patch_id in self.interim_state: self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_REMOVE if self.patch_data.metadata[patch_id].get("reboot_required") != "N": self.allow_insvc_patching = False continue elif self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED: # The RPM is expected to be installed. # If the installed version is the lower, # this patch is in a Partial-Apply state if rc == -1 or patch_id in self.interim_state: self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_APPLY if self.patch_data.metadata[patch_id].get("reboot_required") != "N": self.allow_insvc_patching = False continue if self.hosts[ip].sw_version == "14.10": # For Release 1 personality = "personality-%s" % self.hosts[ip].nodetype else: personality = "personality-%s" % "-".join(self.hosts[ip].subfunctions) # Check the to_remove list for pkg in self.hosts[ip].to_remove: for patch_id in list(self.patch_data.content_versions): if pkg not in self.patch_data.content_versions[patch_id]: continue if patch_id not in self.patch_data.metadata: LOG.error("Patch data missing for %s", patch_id) continue if personality not in self.patch_data.metadata[patch_id]: continue if pkg not in self.patch_data.metadata[patch_id][personality]: continue if self.patch_data.metadata[patch_id]["repostate"] == constants.AVAILABLE: # The RPM is not expected to be installed. # This patch is in a Partial-Remove state self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_REMOVE if self.patch_data.metadata[patch_id].get("reboot_required") != "N": self.allow_insvc_patching = False continue # Check the missing_pkgs list for pkg in self.hosts[ip].missing_pkgs: for patch_id in list(self.patch_data.content_versions): if pkg not in self.patch_data.content_versions[patch_id]: continue if patch_id not in self.patch_data.metadata: LOG.error("Patch data missing for %s", patch_id) continue if personality not in self.patch_data.metadata[patch_id]: continue if pkg not in self.patch_data.metadata[patch_id][personality]: continue if self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED: # The RPM is expected to be installed. # This patch is in a Partial-Apply state self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_APPLY if self.patch_data.metadata[patch_id].get("reboot_required") != "N": self.allow_insvc_patching = False continue self.hosts_lock.release() def get_store_filename(self, patch_sw_version, rpmname): rpm_dir = package_dir[patch_sw_version] rpmfile = "%s/%s" % (rpm_dir, rpmname) return rpmfile def get_repo_filename(self, patch_sw_version, rpmname): rpmfile = self.get_store_filename(patch_sw_version, rpmname) if not os.path.isfile(rpmfile): msg = "Could not find rpm: %s" % rpmfile LOG.error(msg) return None repo_filename = None try: # Get the architecture from the RPM pkgarch = subprocess.check_output(["rpm", "-qp", "--queryformat", "%{ARCH}", "--nosignature", rpmfile]) repo_filename = "%s/Packages/%s/%s" % (repo_dir[patch_sw_version], pkgarch, rpmname) except subprocess.CalledProcessError: msg = "RPM query failed for %s" % rpmfile LOG.exception(msg) return None return repo_filename def run_semantic_check(self, action, patch_list): if not os.path.exists(INITIAL_CONFIG_COMPLETE_FLAG): # Skip semantic checks if initial configuration isn't complete return # Pass the current patch state to the semantic check as a series of args patch_state_args = [] for patch_id in list(self.patch_data.metadata): patch_state = '%s=%s' % (patch_id, self.patch_data.metadata[patch_id]["patchstate"]) patch_state_args += ['-p', patch_state] # Run semantic checks, if any for patch_id in patch_list: semchk = os.path.join(semantics_dir, action, patch_id) if os.path.exists(semchk): try: LOG.info("Running semantic check: %s", semchk) subprocess.check_output([semchk] + patch_state_args, stderr=subprocess.STDOUT) LOG.info("Semantic check %s passed", semchk) except subprocess.CalledProcessError as e: msg = "Semantic check failed for %s:\n%s" % (patch_id, e.output) LOG.exception(msg) raise PatchFail(msg) def patch_import_api(self, patches): """ Import patches :return: """ msg_info = "" msg_warning = "" msg_error = "" # Refresh data, if needed self.base_pkgdata.loaddirs() # Protect against duplications patch_list = sorted(list(set(patches))) # First, make sure the specified files exist for patch in patch_list: if not os.path.isfile(patch): raise PatchFail("File does not exist: %s" % patch) try: if not os.path.exists(avail_dir): os.makedirs(avail_dir) if not os.path.exists(applied_dir): os.makedirs(applied_dir) if not os.path.exists(committed_dir): os.makedirs(committed_dir) except os.error: msg = "Failed to create directories" LOG.exception(msg) raise PatchFail(msg) msg = "Importing patches: %s" % ",".join(patch_list) LOG.info(msg) audit_log_info(msg) for patch in patch_list: msg = "Importing patch: %s" % patch LOG.info(msg) audit_log_info(msg) # Get the patch_id from the filename # and check to see if it's already imported (patch_id, ext) = os.path.splitext(os.path.basename(patch)) if patch_id in self.patch_data.metadata: if self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED: mdir = applied_dir elif self.patch_data.metadata[patch_id]["repostate"] == constants.COMMITTED: msg = "%s is committed. Metadata not updated" % patch_id LOG.info(msg) msg_info += msg + "\n" continue else: mdir = avail_dir try: thispatch = PatchFile.extract_patch(patch, metadata_dir=mdir, metadata_only=True, existing_content=self.patch_data.contents[patch_id], allpatches=self.patch_data, base_pkgdata=self.base_pkgdata) self.patch_data.update_patch(thispatch) msg = "%s is already imported. Updated metadata only" % patch_id LOG.info(msg) msg_info += msg + "\n" except PatchMismatchFailure: msg = "Contents of %s do not match re-imported patch" % patch_id LOG.exception(msg) msg_error += msg + "\n" continue except PatchValidationFailure as e: msg = "Patch validation failed for %s" % patch_id if str(e) is not None and str(e) != '': msg += ":\n%s" % str(e) LOG.exception(msg) msg_error += msg + "\n" continue except PatchFail: msg = "Failed to import patch %s" % patch_id LOG.exception(msg) msg_error += msg + "\n" continue if ext != ".patch": msg = "File must end in .patch extension: %s" \ % os.path.basename(patch) LOG.exception(msg) msg_error += msg + "\n" continue try: thispatch = PatchFile.extract_patch(patch, metadata_dir=avail_dir, allpatches=self.patch_data, base_pkgdata=self.base_pkgdata) msg_info += "%s is now available\n" % patch_id self.patch_data.add_patch(patch_id, thispatch) self.patch_data.metadata[patch_id]["repostate"] = constants.AVAILABLE if len(self.hosts) > 0: self.patch_data.metadata[patch_id]["patchstate"] = constants.AVAILABLE else: self.patch_data.metadata[patch_id]["patchstate"] = constants.UNKNOWN except PatchValidationFailure as e: msg = "Patch validation failed for %s" % patch_id if str(e) is not None and str(e) != '': msg += ":\n%s" % str(e) LOG.exception(msg) msg_error += msg + "\n" continue except PatchFail: msg = "Failed to import patch %s" % patch_id LOG.exception(msg) msg_error += msg + "\n" continue return dict(info=msg_info, warning=msg_warning, error=msg_error) def patch_apply_api(self, patch_ids, **kwargs): """ Apply patches, moving patches from available to applied and updating repo :return: """ msg_info = "" msg_warning = "" msg_error = "" # Protect against duplications patch_list = sorted(list(set(patch_ids))) msg = "Applying patches: %s" % ",".join(patch_list) LOG.info(msg) audit_log_info(msg) if "--all" in patch_list: # Set patch_ids to list of all available patches # We're getting this list now, before we load the applied patches patch_list = [] for patch_id in sorted(list(self.patch_data.metadata)): if self.patch_data.metadata[patch_id]["repostate"] == constants.AVAILABLE: patch_list.append(patch_id) if len(patch_list) == 0: msg_info += "There are no available patches to be applied.\n" return dict(info=msg_info, warning=msg_warning, error=msg_error) repo_changed = False # First, verify that all specified patches exist id_verification = True for patch_id in patch_list: if patch_id not in self.patch_data.metadata: msg = "Patch %s does not exist" % patch_id LOG.error(msg) msg_error += msg + "\n" id_verification = False if not id_verification: return dict(info=msg_info, warning=msg_warning, error=msg_error) # Check for patches that can't be applied during an upgrade upgrade_check = True for patch_id in patch_list: if self.patch_data.metadata[patch_id]["sw_version"] != SW_VERSION \ and self.patch_data.metadata[patch_id].get("apply_active_release_only") == "Y": msg = "%s cannot be applied in an upgrade" % patch_id LOG.error(msg) msg_error += msg + "\n" upgrade_check = False if not upgrade_check: return dict(info=msg_info, warning=msg_warning, error=msg_error) # Next, check the patch dependencies # required_patches will map the required patch to the patches that need it required_patches = {} for patch_id in patch_list: for req_patch in self.patch_data.metadata[patch_id]["requires"]: # Ignore patches in the op set if req_patch in patch_list: continue if req_patch not in required_patches: required_patches[req_patch] = [] required_patches[req_patch].append(patch_id) # Now verify the state of the required patches req_verification = True for req_patch, iter_patch_list in required_patches.items(): if req_patch not in self.patch_data.metadata \ or self.patch_data.metadata[req_patch]["repostate"] == constants.AVAILABLE: msg = "%s is required by: %s" % (req_patch, ", ".join(sorted(iter_patch_list))) msg_error += msg + "\n" LOG.info(msg) req_verification = False if not req_verification: return dict(info=msg_info, warning=msg_warning, error=msg_error) if kwargs.get("skip-semantic") != "yes": self.run_semantic_check(constants.SEMANTIC_PREAPPLY, patch_list) # Start applying the patches for patch_id in patch_list: msg = "Applying patch: %s" % patch_id LOG.info(msg) audit_log_info(msg) if self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED \ or self.patch_data.metadata[patch_id]["repostate"] == constants.COMMITTED: msg = "%s is already in the repo" % patch_id LOG.info(msg) msg_info += msg + "\n" continue # To allow for easy cleanup, we're going to first iterate # through the rpm list to determine where to copy the file. # As a second step, we'll go through the list and copy each file. # If there are problems querying any RPMs, none will be copied. rpmlist = {} for rpmname in self.patch_data.contents[patch_id]: patch_sw_version = self.patch_data.metadata[patch_id]["sw_version"] rpmfile = self.get_store_filename(patch_sw_version, rpmname) if not os.path.isfile(rpmfile): msg = "Could not find rpm: %s" % rpmfile LOG.error(msg) raise RpmFail(msg) repo_filename = self.get_repo_filename(patch_sw_version, rpmname) if repo_filename is None: msg = "Failed to determine repo path for %s" % rpmfile LOG.exception(msg) raise RpmFail(msg) repo_pkg_dir = os.path.dirname(repo_filename) if not os.path.exists(repo_pkg_dir): os.makedirs(repo_pkg_dir) rpmlist[rpmfile] = repo_filename # Copy the RPMs. If a failure occurs, clean up copied files. copied = [] for rpmfile in rpmlist: LOG.info("Copy %s to %s", rpmfile, rpmlist[rpmfile]) try: shutil.copy(rpmfile, rpmlist[rpmfile]) copied.append(rpmlist[rpmfile]) except IOError: msg = "Failed to copy %s" % rpmfile LOG.exception(msg) # Clean up files for filename in copied: LOG.info("Cleaning up %s", filename) os.remove(filename) raise RpmFail(msg) try: # Move the metadata to the applied dir shutil.move("%s/%s-metadata.xml" % (avail_dir, patch_id), "%s/%s-metadata.xml" % (applied_dir, patch_id)) msg_info += "%s is now in the repo\n" % patch_id except shutil.Error: msg = "Failed to move the metadata for %s" % patch_id LOG.exception(msg) raise MetadataFail(msg) self.patch_data.metadata[patch_id]["repostate"] = constants.APPLIED if len(self.hosts) > 0: self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_APPLY else: self.patch_data.metadata[patch_id]["patchstate"] = constants.UNKNOWN self.hosts_lock.acquire() self.interim_state[patch_id] = list(self.hosts) self.hosts_lock.release() repo_changed = True if repo_changed: # Update the repo self.patch_data.gen_groups_xml() for ver, rdir in repo_dir.items(): try: output = subprocess.check_output(["createrepo", "--update", "-g", "comps.xml", rdir], stderr=subprocess.STDOUT) LOG.info("Repo[%s] updated:\n%s", ver, output) except subprocess.CalledProcessError: msg = "Failed to update the repo for %s" % ver LOG.exception(msg) raise PatchFail(msg) else: LOG.info("Repository is unchanged") return dict(info=msg_info, warning=msg_warning, error=msg_error) def patch_remove_api(self, patch_ids, **kwargs): """ Remove patches, moving patches from applied to available and updating repo :return: """ msg_info = "" msg_warning = "" msg_error = "" remove_unremovable = False repo_changed = False # Protect against duplications patch_list = sorted(list(set(patch_ids))) msg = "Removing patches: %s" % ",".join(patch_list) LOG.info(msg) audit_log_info(msg) if kwargs.get("removeunremovable") == "yes": remove_unremovable = True # First, verify that all specified patches exist id_verification = True for patch_id in patch_list: if patch_id not in self.patch_data.metadata: msg = "Patch %s does not exist" % patch_id LOG.error(msg) msg_error += msg + "\n" id_verification = False if not id_verification: return dict(info=msg_info, warning=msg_warning, error=msg_error) # See if any of the patches are marked as unremovable unremovable_verification = True for patch_id in patch_list: if self.patch_data.metadata[patch_id].get("unremovable") == "Y": if remove_unremovable: msg = "Unremovable patch %s being removed" % patch_id LOG.warning(msg) msg_warning += msg + "\n" else: msg = "Patch %s is not removable" % patch_id LOG.error(msg) msg_error += msg + "\n" unremovable_verification = False elif self.patch_data.metadata[patch_id]['repostate'] == constants.COMMITTED: msg = "Patch %s is committed and cannot be removed" % patch_id LOG.error(msg) msg_error += msg + "\n" unremovable_verification = False if not unremovable_verification: return dict(info=msg_info, warning=msg_warning, error=msg_error) # Next, see if any of the patches are required by applied patches # required_patches will map the required patch to the patches that need it required_patches = {} for patch_iter in list(self.patch_data.metadata): # Ignore patches in the op set if patch_iter in patch_list: continue # Only check applied patches if self.patch_data.metadata[patch_iter]["repostate"] == constants.AVAILABLE: continue for req_patch in self.patch_data.metadata[patch_iter]["requires"]: if req_patch not in patch_list: continue if req_patch not in required_patches: required_patches[req_patch] = [] required_patches[req_patch].append(patch_iter) if len(required_patches) > 0: for req_patch, iter_patch_list in required_patches.items(): msg = "%s is required by: %s" % (req_patch, ", ".join(sorted(iter_patch_list))) msg_error += msg + "\n" LOG.info(msg) return dict(info=msg_info, warning=msg_warning, error=msg_error) if kwargs.get("skipappcheck") != "yes": # Check application dependencies before removing required_patches = {} for patch_id in patch_list: for appname, iter_patch_list in self.app_dependencies.items(): if patch_id in iter_patch_list: if patch_id not in required_patches: required_patches[patch_id] = [] required_patches[patch_id].append(appname) if len(required_patches) > 0: for req_patch, app_list in required_patches.items(): msg = "%s is required by application(s): %s" % (req_patch, ", ".join(sorted(app_list))) msg_error += msg + "\n" LOG.info(msg) return dict(info=msg_info, warning=msg_warning, error=msg_error) if kwargs.get("skip-semantic") != "yes": self.run_semantic_check(constants.SEMANTIC_PREREMOVE, patch_list) for patch_id in patch_list: msg = "Removing patch: %s" % patch_id LOG.info(msg) audit_log_info(msg) if self.patch_data.metadata[patch_id]["repostate"] == constants.AVAILABLE: msg = "%s is not in the repo" % patch_id LOG.info(msg) msg_info += msg + "\n" continue repo_changed = True for rpmname in self.patch_data.contents[patch_id]: patch_sw_version = self.patch_data.metadata[patch_id]["sw_version"] rpmfile = self.get_store_filename(patch_sw_version, rpmname) if not os.path.isfile(rpmfile): msg = "Could not find rpm: %s" % rpmfile LOG.error(msg) raise RpmFail(msg) repo_filename = self.get_repo_filename(patch_sw_version, rpmname) if repo_filename is None: msg = "Failed to determine repo path for %s" % rpmfile LOG.exception(msg) raise RpmFail(msg) try: os.remove(repo_filename) except OSError: msg = "Failed to remove RPM" LOG.exception(msg) raise RpmFail(msg) try: # Move the metadata to the available dir shutil.move("%s/%s-metadata.xml" % (applied_dir, patch_id), "%s/%s-metadata.xml" % (avail_dir, patch_id)) msg_info += "%s has been removed from the repo\n" % patch_id except shutil.Error: msg = "Failed to move the metadata for %s" % patch_id LOG.exception(msg) raise MetadataFail(msg) self.patch_data.metadata[patch_id]["repostate"] = constants.AVAILABLE if len(self.hosts) > 0: self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_REMOVE else: self.patch_data.metadata[patch_id]["patchstate"] = constants.UNKNOWN self.hosts_lock.acquire() self.interim_state[patch_id] = list(self.hosts) self.hosts_lock.release() if repo_changed: # Update the repo self.patch_data.gen_groups_xml() for ver, rdir in repo_dir.items(): try: output = subprocess.check_output(["createrepo", "--update", "-g", "comps.xml", rdir], stderr=subprocess.STDOUT) LOG.info("Repo[%s] updated:\n%s", ver, output) except subprocess.CalledProcessError: msg = "Failed to update the repo for %s" % ver LOG.exception(msg) raise PatchFail(msg) else: LOG.info("Repository is unchanged") return dict(info=msg_info, warning=msg_warning, error=msg_error) def patch_delete_api(self, patch_ids): """ Delete patches :return: """ msg_info = "" msg_warning = "" msg_error = "" # Protect against duplications patch_list = sorted(list(set(patch_ids))) msg = "Deleting patches: %s" % ",".join(patch_list) LOG.info(msg) audit_log_info(msg) # Verify patches exist and are in proper state first id_verification = True for patch_id in patch_list: if patch_id not in self.patch_data.metadata: msg = "Patch %s does not exist" % patch_id LOG.error(msg) msg_error += msg + "\n" id_verification = False continue # Get the aggregated patch state, if possible patchstate = constants.UNKNOWN if patch_id in self.patch_data.metadata: patchstate = self.patch_data.metadata[patch_id]["patchstate"] if self.patch_data.metadata[patch_id]["repostate"] != constants.AVAILABLE or \ (patchstate != constants.AVAILABLE and patchstate != constants.UNKNOWN): msg = "Patch %s not in Available state" % patch_id LOG.error(msg) msg_error += msg + "\n" id_verification = False continue if not id_verification: return dict(info=msg_info, warning=msg_warning, error=msg_error) # Handle operation for patch_id in patch_list: for rpmname in self.patch_data.contents[patch_id]: patch_sw_version = self.patch_data.metadata[patch_id]["sw_version"] rpmfile = self.get_store_filename(patch_sw_version, rpmname) if not os.path.isfile(rpmfile): # We're deleting the patch anyway, so the missing file # doesn't really matter continue try: os.remove(rpmfile) except OSError: msg = "Failed to remove RPM %s" % rpmfile LOG.exception(msg) raise RpmFail(msg) for action in constants.SEMANTIC_ACTIONS: action_file = os.path.join(semantics_dir, action, patch_id) if not os.path.isfile(action_file): continue try: os.remove(action_file) except OSError: msg = "Failed to remove semantic %s" % action_file LOG.exception(msg) raise SemanticFail(msg) try: # Delete the metadata os.remove("%s/%s-metadata.xml" % (avail_dir, patch_id)) except OSError: msg = "Failed to remove metadata for %s" % patch_id LOG.exception(msg) raise MetadataFail(msg) self.patch_data.delete_patch(patch_id) msg = "%s has been deleted" % patch_id LOG.info(msg) msg_info += msg + "\n" return dict(info=msg_info, warning=msg_warning, error=msg_error) def patch_init_release_api(self, release): """ Create an empty repo for a new release :return: """ msg_info = "" msg_warning = "" msg_error = "" msg = "Initializing repo for: %s" % release LOG.info(msg) audit_log_info(msg) if release == SW_VERSION: msg = "Rejected: Requested release %s is running release" % release msg_error += msg + "\n" LOG.info(msg) return dict(info=msg_info, warning=msg_warning, error=msg_error) # Refresh data self.base_pkgdata.loaddirs() self.patch_data.load_all_metadata(avail_dir, repostate=constants.AVAILABLE) self.patch_data.load_all_metadata(applied_dir, repostate=constants.APPLIED) self.patch_data.load_all_metadata(committed_dir, repostate=constants.COMMITTED) repo_dir[release] = "%s/rel-%s" % (repo_root_dir, release) # Verify the release doesn't already exist if os.path.exists(repo_dir[release]): msg = "Patch repository for %s already exists" % release msg_info += msg + "\n" LOG.info(msg) return dict(info=msg_info, warning=msg_warning, error=msg_error) # Generate the groups xml self.patch_data.gen_release_groups_xml(release) # Create the repo try: output = subprocess.check_output(["createrepo", "--update", "-g", "comps.xml", repo_dir[release]], stderr=subprocess.STDOUT) LOG.info("Repo[%s] updated:\n%s", release, output) except subprocess.CalledProcessError: msg = "Failed to update the repo for %s" % release LOG.exception(msg) # Wipe out what was created shutil.rmtree(repo_dir[release]) del repo_dir[release] raise PatchFail(msg) return dict(info=msg_info, warning=msg_warning, error=msg_error) def patch_del_release_api(self, release): """ Delete the repo and patches for second release :return: """ msg_info = "" msg_warning = "" msg_error = "" msg = "Deleting repo and patches for: %s" % release LOG.info(msg) audit_log_info(msg) if release == SW_VERSION: msg = "Rejected: Requested release %s is running release" % release msg_error += msg + "\n" LOG.info(msg) return dict(info=msg_info, warning=msg_warning, error=msg_error) # Delete patch XML files for patch_id in list(self.patch_data.metadata): if self.patch_data.metadata[patch_id]["sw_version"] != release: continue if self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED: mdir = applied_dir elif self.patch_data.metadata[patch_id]["repostate"] == constants.COMMITTED: mdir = committed_dir else: mdir = avail_dir for action in constants.SEMANTIC_ACTIONS: action_file = os.path.join(semantics_dir, action, patch_id) if not os.path.isfile(action_file): continue try: os.remove(action_file) except OSError: msg = "Failed to remove semantic %s" % action_file LOG.exception(msg) raise SemanticFail(msg) try: # Delete the metadata os.remove("%s/%s-metadata.xml" % (mdir, patch_id)) except OSError: msg = "Failed to remove metadata for %s" % patch_id LOG.exception(msg) # Refresh patch data self.patch_data = PatchData() self.patch_data.load_all_metadata(avail_dir, repostate=constants.AVAILABLE) self.patch_data.load_all_metadata(applied_dir, repostate=constants.APPLIED) self.patch_data.load_all_metadata(committed_dir, repostate=constants.COMMITTED) raise MetadataFail(msg) # Delete the packages dir package_dir[release] = "%s/%s" % (root_package_dir, release) if os.path.exists(package_dir[release]): try: shutil.rmtree(package_dir[release]) except shutil.Error: msg = "Failed to delete package dir for %s" % release LOG.exception(msg) del package_dir[release] # Verify the release exists repo_dir[release] = "%s/rel-%s" % (repo_root_dir, release) if not os.path.exists(repo_dir[release]): # Nothing to do msg = "Patch repository for %s does not exist" % release msg_info += msg + "\n" LOG.info(msg) del repo_dir[release] # Refresh patch data self.patch_data = PatchData() self.patch_data.load_all_metadata(avail_dir, repostate=constants.AVAILABLE) self.patch_data.load_all_metadata(applied_dir, repostate=constants.APPLIED) self.patch_data.load_all_metadata(committed_dir, repostate=constants.COMMITTED) return dict(info=msg_info, warning=msg_warning, error=msg_error) # Delete the repo try: shutil.rmtree(repo_dir[release]) except shutil.Error: msg = "Failed to delete repo for %s" % release LOG.exception(msg) del repo_dir[release] if self.base_pkgdata is not None and release in self.base_pkgdata.pkgs: del self.base_pkgdata.pkgs[release] # Refresh patch data self.patch_data = PatchData() self.patch_data.load_all_metadata(avail_dir, repostate=constants.AVAILABLE) self.patch_data.load_all_metadata(applied_dir, repostate=constants.APPLIED) self.patch_data.load_all_metadata(committed_dir, repostate=constants.COMMITTED) return dict(info=msg_info, warning=msg_warning, error=msg_error) def patch_query_what_requires(self, patch_ids): """ Query the known patches to see which have dependencies on the specified patches :return: """ msg_info = "" msg_warning = "" msg_error = "" msg = "Querying what requires patches: %s" % ",".join(patch_ids) LOG.info(msg) audit_log_info(msg) # First, verify that all specified patches exist id_verification = True for patch_id in patch_ids: if patch_id not in self.patch_data.metadata: msg = "Patch %s does not exist" % patch_id LOG.error(msg) msg_error += msg + "\n" id_verification = False if not id_verification: return dict(info=msg_info, warning=msg_warning, error=msg_error) required_patches = {} for patch_iter in list(self.patch_data.metadata): for req_patch in self.patch_data.metadata[patch_iter]["requires"]: if req_patch not in patch_ids: continue if req_patch not in required_patches: required_patches[req_patch] = [] required_patches[req_patch].append(patch_iter) for patch_id in patch_ids: if patch_id in required_patches: iter_patch_list = required_patches[patch_id] msg_info += "%s is required by: %s\n" % (patch_id, ", ".join(sorted(iter_patch_list))) else: msg_info += "%s is not required by any patches.\n" % patch_id return dict(info=msg_info, warning=msg_warning, error=msg_error) def patch_sync(self): # Increment the patch_op_counter here self.inc_patch_op_counter() self.patch_data_lock.acquire() # self.patch_data.load_all() self.check_patch_states() self.patch_data_lock.release() if self.sock_out is None: return True # Send the sync requests self.controller_neighbours_lock.acquire() for n in self.controller_neighbours: self.controller_neighbours[n].clear_synced() self.controller_neighbours_lock.release() msg = PatchMessageSyncReq() self.socket_lock.acquire() msg.send(self.sock_out) self.socket_lock.release() # Now we wait, up to two mins... TODO: Wait on a condition my_ip = cfg.get_mgmt_ip() sync_rc = False max_time = time.time() + 120 while time.time() < max_time: all_done = True self.controller_neighbours_lock.acquire() for n in self.controller_neighbours: if n != my_ip and not self.controller_neighbours[n].get_synced(): all_done = False self.controller_neighbours_lock.release() if all_done: LOG.info("Sync complete") sync_rc = True break time.sleep(0.5) # Send hellos to the hosts now, to get queries performed hello_agent = PatchMessageHelloAgent() self.socket_lock.acquire() hello_agent.send(self.sock_out) self.socket_lock.release() if not sync_rc: LOG.info("Timed out waiting for sync completion") return sync_rc def patch_query_cached(self, **kwargs): query_state = None if "show" in kwargs: if kwargs["show"] == "available": query_state = constants.AVAILABLE elif kwargs["show"] == "applied": query_state = constants.APPLIED elif kwargs["show"] == "committed": query_state = constants.COMMITTED query_release = None if "release" in kwargs: query_release = kwargs["release"] results = {} self.patch_data_lock.acquire() if query_state is None and query_release is None: # Return everything results = self.patch_data.metadata else: # Filter results for patch_id, data in self.patch_data.metadata.items(): if query_state is not None and data["repostate"] != query_state: continue if query_release is not None and data["sw_version"] != query_release: continue results[patch_id] = data self.patch_data_lock.release() return results def patch_query_specific_cached(self, patch_ids): audit_log_info("Patch show") results = {"metadata": {}, "contents": {}, "error": ""} self.patch_data_lock.acquire() for patch_id in patch_ids: if patch_id not in list(self.patch_data.metadata): results["error"] += "%s is unrecognized\n" % patch_id for patch_id, data in self.patch_data.metadata.items(): if patch_id in patch_ids: results["metadata"][patch_id] = data for patch_id, data in self.patch_data.contents.items(): if patch_id in patch_ids: results["contents"][patch_id] = data self.patch_data_lock.release() return results def get_dependencies(self, patch_ids, recursive): dependencies = set() patch_added = False self.patch_data_lock.acquire() # Add patches to workset for patch_id in sorted(patch_ids): dependencies.add(patch_id) patch_added = True while patch_added: patch_added = False for patch_id in sorted(dependencies): for req in self.patch_data.metadata[patch_id]["requires"]: if req not in dependencies: dependencies.add(req) patch_added = recursive self.patch_data_lock.release() return sorted(dependencies) def patch_query_dependencies(self, patch_ids, **kwargs): msg = "Patch query-dependencies %s" % patch_ids LOG.info(msg) audit_log_info(msg) failure = False results = {"patches": [], "error": ""} recursive = False if kwargs.get("recursive") == "yes": recursive = True self.patch_data_lock.acquire() # Verify patch IDs for patch_id in sorted(patch_ids): if patch_id not in list(self.patch_data.metadata): errormsg = "%s is unrecognized\n" % patch_id LOG.info("patch_query_dependencies: %s", errormsg) results["error"] += errormsg failure = True self.patch_data_lock.release() if failure: LOG.info("patch_query_dependencies failed") return results results["patches"] = self.get_dependencies(patch_ids, recursive) return results def patch_commit(self, patch_ids, dry_run=False): msg = "Patch commit %s" % patch_ids LOG.info(msg) audit_log_info(msg) try: if not os.path.exists(committed_dir): os.makedirs(committed_dir) except os.error: msg = "Failed to create %s" % committed_dir LOG.exception(msg) raise PatchFail(msg) failure = False recursive = True keep = {} cleanup = {} cleanup_files = set() results = {"info": "", "error": ""} # Ensure there are only REL patches non_rel_list = [] self.patch_data_lock.acquire() for patch_id in self.patch_data.metadata: if self.patch_data.metadata[patch_id]['status'] != constants.STATUS_RELEASED: non_rel_list.append(patch_id) self.patch_data_lock.release() if len(non_rel_list) > 0: errormsg = "A commit cannot be performed with non-REL status patches in the system:\n" for patch_id in non_rel_list: errormsg += " %s\n" % patch_id LOG.info("patch_commit rejected: %s", errormsg) results["error"] += errormsg return results # Verify patch IDs self.patch_data_lock.acquire() for patch_id in sorted(patch_ids): if patch_id not in list(self.patch_data.metadata): errormsg = "%s is unrecognized\n" % patch_id LOG.info("patch_commit: %s", errormsg) results["error"] += errormsg failure = True self.patch_data_lock.release() if failure: LOG.info("patch_commit: Failed patch ID check") return results commit_list = self.get_dependencies(patch_ids, recursive) # Check patch states avail_list = [] self.patch_data_lock.acquire() for patch_id in commit_list: if self.patch_data.metadata[patch_id]['patchstate'] != constants.APPLIED \ and self.patch_data.metadata[patch_id]['patchstate'] != constants.COMMITTED: avail_list.append(patch_id) self.patch_data_lock.release() if len(avail_list) > 0: errormsg = "The following patches are not applied and cannot be committed:\n" for patch_id in avail_list: errormsg += " %s\n" % patch_id LOG.info("patch_commit rejected: %s", errormsg) results["error"] += errormsg return results # Get list of packages self.patch_data_lock.acquire() for patch_id in commit_list: patch_sw_version = self.patch_data.metadata[patch_id]["sw_version"] if patch_sw_version not in keep: keep[patch_sw_version] = {} if patch_sw_version not in cleanup: cleanup[patch_sw_version] = {} for rpmname in self.patch_data.contents[patch_id]: try: pkgname, arch, pkgver = parse_rpm_filename(rpmname) except ValueError as e: self.patch_data_lock.release() raise e if pkgname not in keep[patch_sw_version]: keep[patch_sw_version][pkgname] = {arch: pkgver} continue elif arch not in keep[patch_sw_version][pkgname]: keep[patch_sw_version][pkgname][arch] = pkgver continue # Compare versions keep_pkgver = keep[patch_sw_version][pkgname][arch] if pkgver > keep_pkgver: if pkgname not in cleanup[patch_sw_version]: cleanup[patch_sw_version][pkgname] = {arch: [keep_pkgver]} elif arch not in cleanup[patch_sw_version][pkgname]: cleanup[patch_sw_version][pkgname][arch] = [keep_pkgver] else: cleanup[patch_sw_version][pkgname][arch].append(keep_pkgver) # Find the rpmname keep_rpmname = keep_pkgver.generate_rpm_filename(pkgname, arch) store_filename = self.get_store_filename(patch_sw_version, keep_rpmname) if store_filename is not None and os.path.exists(store_filename): cleanup_files.add(store_filename) repo_filename = self.get_repo_filename(patch_sw_version, keep_rpmname) if repo_filename is not None and os.path.exists(repo_filename): cleanup_files.add(repo_filename) # Keep the new pkgver keep[patch_sw_version][pkgname][arch] = pkgver else: # Put this pkg in the cleanup list if pkgname not in cleanup[patch_sw_version]: cleanup[patch_sw_version][pkgname] = {arch: [pkgver]} elif arch not in cleanup[patch_sw_version][pkgname]: cleanup[patch_sw_version][pkgname][arch] = [pkgver] else: cleanup[patch_sw_version][pkgname][arch].append(pkgver) store_filename = self.get_store_filename(patch_sw_version, rpmname) if store_filename is not None and os.path.exists(store_filename): cleanup_files.add(store_filename) repo_filename = self.get_repo_filename(patch_sw_version, rpmname) if repo_filename is not None and os.path.exists(repo_filename): cleanup_files.add(repo_filename) self.patch_data_lock.release() # Calculate disk space disk_space = 0 for rpmfile in cleanup_files: statinfo = os.stat(rpmfile) disk_space += statinfo.st_size if dry_run: results["info"] = "This commit operation would free %0.2f MiB" % (disk_space / (1024.0 * 1024.0)) # pylint: disable=W1619 return results # Do the commit # Move the metadata to the committed dir for patch_id in commit_list: metadata_fname = "%s-metadata.xml" % patch_id applied_fname = os.path.join(applied_dir, metadata_fname) committed_fname = os.path.join(committed_dir, metadata_fname) if os.path.exists(applied_fname): try: shutil.move(applied_fname, committed_fname) except shutil.Error: msg = "Failed to move the metadata for %s" % patch_id LOG.exception(msg) raise MetadataFail(msg) # Delete the files for rpmfile in cleanup_files: try: os.remove(rpmfile) except OSError: msg = "Failed to remove: %s" % rpmfile LOG.exception(msg) raise MetadataFail(msg) # Update the repo self.patch_data.gen_groups_xml() for ver, rdir in repo_dir.items(): try: output = subprocess.check_output(["createrepo", "--update", "-g", "comps.xml", rdir], stderr=subprocess.STDOUT) LOG.info("Repo[%s] updated:\n%s", ver, output) except subprocess.CalledProcessError: msg = "Failed to update the repo for %s" % ver LOG.exception(msg) raise PatchFail(msg) self.patch_data.load_all() results["info"] = "The patches have been committed." return results def query_host_cache(self): output = [] self.hosts_lock.acquire() for nbr in list(self.hosts): host = self.hosts[nbr].get_dict() host["interim_state"] = False for patch_id in list(pc.interim_state): if nbr in pc.interim_state[patch_id]: host["interim_state"] = True output.append(host) self.hosts_lock.release() return output def any_patch_host_installing(self): rc = False self.hosts_lock.acquire() for host in self.hosts.values(): if host.state == constants.PATCH_AGENT_STATE_INSTALLING: rc = True break self.hosts_lock.release() return rc def patch_host_install(self, host_ip, force, async_req=False): msg_info = "" msg_warning = "" msg_error = "" ip = host_ip self.hosts_lock.acquire() # If not in hosts table, maybe a hostname was used instead if host_ip not in self.hosts: try: ip = utils.gethostbyname(host_ip) if ip not in self.hosts: # Translated successfully, but IP isn't in the table. # Raise an exception to drop out to the failure handling raise PatchError("Host IP (%s) not in table" % ip) except Exception: self.hosts_lock.release() msg = "Unknown host specified: %s" % host_ip msg_error += msg + "\n" LOG.error("Error in host-install: %s", msg) return dict(info=msg_info, warning=msg_warning, error=msg_error) msg = "Running host-install for %s (%s), force=%s, async_req=%s" % (host_ip, ip, force, async_req) LOG.info(msg) audit_log_info(msg) if self.allow_insvc_patching: LOG.info("Allowing in-service patching") force = True self.hosts[ip].install_pending = True self.hosts[ip].install_status = False self.hosts[ip].install_reject_reason = None self.hosts_lock.release() installreq = PatchMessageAgentInstallReq() installreq.ip = ip installreq.force = force installreq.encode() self.socket_lock.acquire() installreq.send(self.sock_out) self.socket_lock.release() if async_req: # async_req install requested, so return now msg = "Patch installation request sent to %s." % self.hosts[ip].hostname msg_info += msg + "\n" LOG.info("host-install async_req: %s", msg) return dict(info=msg_info, warning=msg_warning, error=msg_error) # Now we wait, up to ten mins... TODO: Wait on a condition resp_rx = False max_time = time.time() + 600 while time.time() < max_time: self.hosts_lock.acquire() if ip not in self.hosts: # The host aged out while we were waiting self.hosts_lock.release() msg = "Agent expired while waiting: %s" % ip msg_error += msg + "\n" LOG.error("Error in host-install: %s", msg) break if not self.hosts[ip].install_pending: # We got a response resp_rx = True if self.hosts[ip].install_status: msg = "Patch installation was successful on %s." % self.hosts[ip].hostname msg_info += msg + "\n" LOG.info("host-install: %s", msg) elif self.hosts[ip].install_reject_reason: msg = "Patch installation rejected by %s. %s" % ( self.hosts[ip].hostname, self.hosts[ip].install_reject_reason) msg_error += msg + "\n" LOG.error("Error in host-install: %s", msg) else: msg = "Patch installation failed on %s." % self.hosts[ip].hostname msg_error += msg + "\n" LOG.error("Error in host-install: %s", msg) self.hosts_lock.release() break self.hosts_lock.release() time.sleep(0.5) if not resp_rx: msg = "Timeout occurred while waiting response from %s." % ip msg_error += msg + "\n" LOG.error("Error in host-install: %s", msg) return dict(info=msg_info, warning=msg_warning, error=msg_error) def drop_host(self, host_ip, sync_nbr=True): msg_info = "" msg_warning = "" msg_error = "" ip = host_ip self.hosts_lock.acquire() # If not in hosts table, maybe a hostname was used instead if host_ip not in self.hosts: try: # Because the host may be getting dropped due to deletion, # we may be unable to do a hostname lookup. Instead, we'll # iterate through the table here. for host in list(self.hosts): if host_ip == self.hosts[host].hostname: ip = host break if ip not in self.hosts: # Translated successfully, but IP isn't in the table. # Raise an exception to drop out to the failure handling raise PatchError("Host IP (%s) not in table" % ip) except Exception: self.hosts_lock.release() msg = "Unknown host specified: %s" % host_ip msg_error += msg + "\n" LOG.error("Error in drop-host: %s", msg) return dict(info=msg_info, warning=msg_warning, error=msg_error) msg = "Running drop-host for %s (%s)" % (host_ip, ip) LOG.info(msg) audit_log_info(msg) del self.hosts[ip] for patch_id in list(self.interim_state): if ip in self.interim_state[patch_id]: self.interim_state[patch_id].remove(ip) self.hosts_lock.release() if sync_nbr: sync_msg = PatchMessageDropHostReq() sync_msg.ip = ip self.socket_lock.acquire() sync_msg.send(self.sock_out) self.socket_lock.release() return dict(info=msg_info, warning=msg_warning, error=msg_error) def is_applied(self, patch_ids): all_applied = True self.patch_data_lock.acquire() for patch_id in patch_ids: if patch_id not in self.patch_data.metadata: all_applied = False break if self.patch_data.metadata[patch_id]["patchstate"] != constants.APPLIED: all_applied = False break self.patch_data_lock.release() return all_applied def is_available(self, patch_ids): all_available = True self.patch_data_lock.acquire() for patch_id in patch_ids: if patch_id not in self.patch_data.metadata: all_available = False break if self.patch_data.metadata[patch_id]["patchstate"] != \ constants.AVAILABLE: all_available = False break self.patch_data_lock.release() return all_available def report_app_dependencies(self, patch_ids, **kwargs): """ Handle report of application dependencies """ if "app" not in kwargs: raise PatchInvalidRequest appname = kwargs.get("app") LOG.info("Handling app dependencies report: app=%s, patch_ids=%s", appname, ','.join(patch_ids)) self.patch_data_lock.acquire() if len(patch_ids) == 0: if appname in self.app_dependencies: del self.app_dependencies[appname] else: self.app_dependencies[appname] = patch_ids try: tmpfile, tmpfname = tempfile.mkstemp( prefix=app_dependency_basename, dir=constants.PATCH_STORAGE_DIR) os.write(tmpfile, json.dumps(self.app_dependencies)) os.close(tmpfile) os.rename(tmpfname, app_dependency_filename) except Exception: LOG.exception("Failed in report_app_dependencies") raise PatchFail("Internal failure") finally: self.patch_data_lock.release() return True def query_app_dependencies(self): """ Query application dependencies """ self.patch_data_lock.acquire() data = self.app_dependencies self.patch_data_lock.release() return dict(data) # The wsgiref.simple_server module has an error handler that catches # and prints any exceptions that occur during the API handling to stderr. # This means the patching sys.excepthook handler that logs uncaught # exceptions is never called, and those exceptions are lost. # # To get around this, we're subclassing the simple_server.ServerHandler # in order to replace the handle_error method with a custom one that # logs the exception instead, and will set a global flag to shutdown # the server and reset. # class MyServerHandler(simple_server.ServerHandler): def handle_error(self): LOG.exception('An uncaught exception has occurred:') if not self.headers_sent: self.result = self.error_output(self.environ, self.start_response) self.finish_response() global keep_running keep_running = False def get_handler_cls(): cls = simple_server.WSGIRequestHandler # old-style class doesn't support super class MyHandler(cls, object): def address_string(self): # In the future, we could provide a config option to allow reverse DNS lookup return self.client_address[0] # Overload the handle function to use our own MyServerHandler def handle(self): """Handle a single HTTP request""" self.raw_requestline = self.rfile.readline() if not self.parse_request(): # An error code has been sent, just exit return handler = MyServerHandler( self.rfile, self.wfile, self.get_stderr(), self.get_environ() ) handler.request_handler = self # pylint: disable=attribute-defined-outside-init handler.run(self.server.get_app()) return MyHandler class PatchControllerApiThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.wsgi = None def run(self): host = "127.0.0.1" port = cfg.api_port try: # In order to support IPv6, server_class.address_family must be # set to the correct address family. Because the unauthenticated # API always uses IPv4 for the loopback address, the address_family # variable cannot be set directly in the WSGIServer class, so a # local subclass needs to be created for the call to make_server, # where the correct address_family can be specified. class server_class(simple_server.WSGIServer): pass server_class.address_family = socket.AF_INET self.wsgi = simple_server.make_server( host, port, app.VersionSelectorApplication(), server_class=server_class, handler_class=get_handler_cls()) self.wsgi.socket.settimeout(api_socket_timeout) global keep_running while keep_running: self.wsgi.handle_request() # Call garbage collect after wsgi request is handled, # to ensure any open file handles are closed in the case # of an upload. gc.collect() except Exception: # Log all exceptions LOG.exception("Error occurred during request processing") global thread_death thread_death.set() def kill(self): # Must run from other thread if self.wsgi is not None: self.wsgi.shutdown() class PatchControllerAuthApiThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) # LOG.info ("Initializing Authenticated API thread") self.wsgi = None def run(self): host = CONF.auth_api_bind_ip port = CONF.auth_api_port if host is None: host = utils.get_versioned_address_all() try: # Can only launch authenticated server post-config while not os.path.exists('/etc/platform/.initial_config_complete'): time.sleep(5) # In order to support IPv6, server_class.address_family must be # set to the correct address family. Because the unauthenticated # API always uses IPv4 for the loopback address, the address_family # variable cannot be set directly in the WSGIServer class, so a # local subclass needs to be created for the call to make_server, # where the correct address_family can be specified. class server_class(simple_server.WSGIServer): pass server_class.address_family = utils.get_management_family() self.wsgi = simple_server.make_server( host, port, auth_app.VersionSelectorApplication(), server_class=server_class, handler_class=get_handler_cls()) # self.wsgi.serve_forever() self.wsgi.socket.settimeout(api_socket_timeout) global keep_running while keep_running: self.wsgi.handle_request() # Call garbage collect after wsgi request is handled, # to ensure any open file handles are closed in the case # of an upload. gc.collect() except Exception: # Log all exceptions LOG.exception("Authorized API failure: Error occurred during request processing") def kill(self): # Must run from other thread if self.wsgi is not None: self.wsgi.shutdown() class PatchControllerMainThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) # LOG.info ("Initializing Main thread") def run(self): global pc global thread_death # LOG.info ("In Main thread") try: sock_in = pc.setup_socket() while sock_in is None: # Check every thirty seconds? # Once we've got a conf file, tied into packstack, # we'll get restarted when the file is updated, # and this should be unnecessary. time.sleep(30) sock_in = pc.setup_socket() # Ok, now we've got our socket. Let's start with a hello! pc.socket_lock.acquire() hello = PatchMessageHello() hello.send(pc.sock_out) hello_agent = PatchMessageHelloAgent() hello_agent.send(pc.sock_out) pc.socket_lock.release() # Send hello every thirty seconds hello_timeout = time.time() + 30.0 remaining = 30 agent_query_conns = [] while True: # Check to see if any other thread has died if thread_death.is_set(): LOG.info("Detected thread death. Terminating") return # Check for in-service patch restart flag if os.path.exists(insvc_patch_restart_controller): LOG.info("In-service patch restart flag detected. Exiting.") global keep_running keep_running = False os.remove(insvc_patch_restart_controller) return inputs = [pc.sock_in] + agent_query_conns outputs = [] # LOG.info("Running select, remaining=%d", remaining) rlist, wlist, xlist = select.select(inputs, outputs, inputs, remaining) if (len(rlist) == 0 and len(wlist) == 0 and len(xlist) == 0): # Timeout hit pc.audit_socket() # LOG.info("Checking sockets") for s in rlist: data = '' addr = None msg = None if s == pc.sock_in: # Receive from UDP pc.socket_lock.acquire() data, addr = s.recvfrom(1024) pc.socket_lock.release() else: # Receive from TCP while True: try: packet = s.recv(1024) except socket.error: LOG.exception("Socket error on recv") data = '' break if packet: data += packet.decode() if data == '': break try: json.loads(data) break except ValueError: # Message is incomplete continue else: LOG.info('End of TCP message received') break if data == '': # Connection dropped agent_query_conns.remove(s) s.close() continue # Get the TCP endpoint address addr = s.getpeername() msgdata = json.loads(data) # For now, discard any messages that are not msgversion==1 if 'msgversion' in msgdata and msgdata['msgversion'] != 1: continue if 'msgtype' in msgdata: if msgdata['msgtype'] == messages.PATCHMSG_HELLO: msg = PatchMessageHello() elif msgdata['msgtype'] == messages.PATCHMSG_HELLO_ACK: msg = PatchMessageHelloAck() elif msgdata['msgtype'] == messages.PATCHMSG_SYNC_REQ: msg = PatchMessageSyncReq() elif msgdata['msgtype'] == messages.PATCHMSG_SYNC_COMPLETE: msg = PatchMessageSyncComplete() elif msgdata['msgtype'] == messages.PATCHMSG_HELLO_AGENT_ACK: msg = PatchMessageHelloAgentAck() elif msgdata['msgtype'] == messages.PATCHMSG_QUERY_DETAILED_RESP: msg = PatchMessageQueryDetailedResp() elif msgdata['msgtype'] == messages.PATCHMSG_AGENT_INSTALL_RESP: msg = PatchMessageAgentInstallResp() elif msgdata['msgtype'] == messages.PATCHMSG_DROP_HOST_REQ: msg = PatchMessageDropHostReq() if msg is None: msg = messages.PatchMessage() msg.decode(msgdata) if s == pc.sock_in: msg.handle(pc.sock_out, addr) else: msg.handle(s, addr) # We can drop the connection after a query response if msg.msgtype == messages.PATCHMSG_QUERY_DETAILED_RESP and s != pc.sock_in: agent_query_conns.remove(s) s.shutdown(socket.SHUT_RDWR) s.close() while len(stale_hosts) > 0 and len(agent_query_conns) <= 5: ip = stale_hosts.pop() try: agent_sock = socket.create_connection((ip, cfg.agent_port)) query = PatchMessageQueryDetailed() query.send(agent_sock) agent_query_conns.append(agent_sock) except Exception: # Put it back on the list stale_hosts.append(ip) remaining = int(hello_timeout - time.time()) if remaining <= 0 or remaining > 30: hello_timeout = time.time() + 30.0 remaining = 30 pc.socket_lock.acquire() hello = PatchMessageHello() hello.send(pc.sock_out) hello_agent = PatchMessageHelloAgent() hello_agent.send(pc.sock_out) pc.socket_lock.release() # Age out neighbours pc.controller_neighbours_lock.acquire() nbrs = list(pc.controller_neighbours) for n in nbrs: # Age out controllers after 2 minutes if pc.controller_neighbours[n].get_age() >= 120: LOG.info("Aging out controller %s from table", n) del pc.controller_neighbours[n] pc.controller_neighbours_lock.release() pc.hosts_lock.acquire() nbrs = list(pc.hosts) for n in nbrs: # Age out hosts after 1 hour if pc.hosts[n].get_age() >= 3600: LOG.info("Aging out host %s from table", n) del pc.hosts[n] for patch_id in list(pc.interim_state): if n in pc.interim_state[patch_id]: pc.interim_state[patch_id].remove(n) pc.hosts_lock.release() except Exception: # Log all exceptions LOG.exception("Error occurred during request processing") thread_death.set() def main(): configure_logging() cfg.read_config() # daemon.pidlockfile.write_pid_to_pidfile(pidfile_path) global thread_death thread_death = threading.Event() # Set the TMPDIR environment variable to /scratch so that any modules # that create directories with tempfile will not use /tmp os.environ['TMPDIR'] = '/scratch' global pc pc = PatchController() LOG.info("launching") api_thread = PatchControllerApiThread() auth_api_thread = PatchControllerAuthApiThread() main_thread = PatchControllerMainThread() api_thread.start() auth_api_thread.start() main_thread.start() thread_death.wait() global keep_running keep_running = False api_thread.join() auth_api_thread.join() main_thread.join()