update/sw-patch/cgcs-patch/cgcs_patch/patch_controller.py

2513 lines
90 KiB
Python

"""
Copyright (c) 2014-2022 Wind River Systems, Inc.
SPDX-License-Identifier: Apache-2.0
"""
import gc
import json
import os
import select
import shutil
import six
from six.moves import configparser
import socket
import subprocess
import tarfile
import tempfile
import threading
import time
from wsgiref import simple_server
from oslo_config import cfg as oslo_cfg
from cgcs_patch import ostree_utils
from cgcs_patch.api import app
from cgcs_patch.authapi import app as auth_app
from cgcs_patch.patch_functions import configure_logging
from cgcs_patch.patch_functions import BasePackageData
from cgcs_patch.patch_functions import avail_dir
from cgcs_patch.patch_functions import applied_dir
from cgcs_patch.patch_functions import committed_dir
from cgcs_patch.patch_functions import PatchFile
from cgcs_patch.patch_functions import package_dir
from cgcs_patch.patch_functions import repo_dir
from cgcs_patch.patch_functions import semantics_dir
from cgcs_patch.patch_functions import SW_VERSION
from cgcs_patch.patch_functions import root_package_dir
from cgcs_patch.exceptions import MetadataFail
from cgcs_patch.exceptions import OSTreeCommandFail
from cgcs_patch.exceptions import OSTreeTarFail
from cgcs_patch.exceptions import PatchError
from cgcs_patch.exceptions import PatchFail
from cgcs_patch.exceptions import PatchInvalidRequest
from cgcs_patch.exceptions import PatchValidationFailure
from cgcs_patch.exceptions import PatchMismatchFailure
from cgcs_patch.exceptions import SemanticFail
from cgcs_patch.patch_functions import LOG
from cgcs_patch.patch_functions import audit_log_info
from cgcs_patch.patch_functions import patch_dir
from cgcs_patch.patch_functions import repo_root_dir
from cgcs_patch.patch_functions import PatchData
from cgcs_patch.base import PatchService
import cgcs_patch.config as cfg
import cgcs_patch.utils as utils
import cgcs_patch.messages as messages
import cgcs_patch.constants as constants
from tsconfig.tsconfig import INITIAL_CONFIG_COMPLETE_FLAG
CONF = oslo_cfg.CONF
pidfile_path = "/var/run/patch_controller.pid"
pc = None
state_file = "%s/.controller.state" % constants.PATCH_STORAGE_DIR
app_dependency_basename = "app_dependencies.json"
app_dependency_filename = "%s/%s" % (constants.PATCH_STORAGE_DIR, app_dependency_basename)
insvc_patch_restart_controller = "/run/patching/.restart.patch-controller"
stale_hosts = []
pending_queries = []
thread_death = None
keep_running = True
# Limit socket blocking to 5 seconds to allow for thread to shutdown
api_socket_timeout = 5.0
class ControllerNeighbour(object):
def __init__(self):
self.last_ack = 0
self.synced = False
def rx_ack(self):
self.last_ack = time.time()
def get_age(self):
return int(time.time() - self.last_ack)
def rx_synced(self):
self.synced = True
def clear_synced(self):
self.synced = False
def get_synced(self):
return self.synced
class AgentNeighbour(object):
def __init__(self, ip):
self.ip = ip
self.last_ack = 0
self.last_query_id = 0
self.out_of_date = False
self.hostname = "n/a"
self.requires_reboot = False
self.patch_failed = False
self.stale = False
self.pending_query = False
self.latest_sysroot_commit = None
self.nodetype = None
self.sw_version = "unknown"
self.subfunctions = []
self.state = None
def rx_ack(self,
hostname,
out_of_date,
requires_reboot,
query_id,
patch_failed,
sw_version,
state):
self.last_ack = time.time()
self.hostname = hostname
self.patch_failed = patch_failed
self.sw_version = sw_version
self.state = state
if out_of_date != self.out_of_date or requires_reboot != self.requires_reboot:
self.out_of_date = out_of_date
self.requires_reboot = requires_reboot
LOG.info("Agent %s (%s) reporting out_of_date=%s, requires_reboot=%s",
self.hostname,
self.ip,
self.out_of_date,
self.requires_reboot)
if self.last_query_id != query_id:
self.last_query_id = query_id
self.stale = True
if self.ip not in stale_hosts and self.ip not in pending_queries:
stale_hosts.append(self.ip)
def get_age(self):
return int(time.time() - self.last_ack)
def handle_query_detailed_resp(self,
latest_sysroot_commit,
nodetype,
sw_version,
subfunctions,
state):
self.latest_sysroot_commit = latest_sysroot_commit
self.nodetype = nodetype
self.stale = False
self.pending_query = False
self.sw_version = sw_version
self.subfunctions = subfunctions
self.state = state
if self.ip in pending_queries:
pending_queries.remove(self.ip)
if self.ip in stale_hosts:
stale_hosts.remove(self.ip)
def get_dict(self):
d = {"ip": self.ip,
"hostname": self.hostname,
"patch_current": not self.out_of_date,
"secs_since_ack": self.get_age(),
"patch_failed": self.patch_failed,
"stale_details": self.stale,
"latest_sysroot_commit": self.latest_sysroot_commit,
"nodetype": self.nodetype,
"subfunctions": self.subfunctions,
"sw_version": self.sw_version,
"state": self.state}
global pc
if self.out_of_date and not pc.allow_insvc_patching:
d["requires_reboot"] = True
else:
d["requires_reboot"] = self.requires_reboot
# Included for future enhancement, to allow per-node determination
# of in-service patching
d["allow_insvc_patching"] = pc.allow_insvc_patching
return d
class PatchMessageHello(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_HELLO)
self.patch_op_counter = 0
def decode(self, data):
messages.PatchMessage.decode(self, data)
if 'patch_op_counter' in data:
self.patch_op_counter = data['patch_op_counter']
def encode(self):
global pc
messages.PatchMessage.encode(self)
self.message['patch_op_counter'] = pc.patch_op_counter
def handle(self, sock, addr):
global pc
host = addr[0]
if host == cfg.get_mgmt_ip():
# Ignore messages from self
return
# Send response
if self.patch_op_counter > 0:
pc.handle_nbr_patch_op_counter(host, self.patch_op_counter)
resp = PatchMessageHelloAck()
resp.send(sock)
def send(self, sock):
global pc
self.encode()
message = json.dumps(self.message)
sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port))
class PatchMessageHelloAck(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_HELLO_ACK)
def encode(self):
# Nothing to add, so just call the super class
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
global pc
pc.controller_neighbours_lock.acquire()
if not addr[0] in pc.controller_neighbours:
pc.controller_neighbours[addr[0]] = ControllerNeighbour()
pc.controller_neighbours[addr[0]].rx_ack()
pc.controller_neighbours_lock.release()
def send(self, sock):
global pc
self.encode()
message = json.dumps(self.message)
sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port))
class PatchMessageSyncReq(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_SYNC_REQ)
def encode(self):
# Nothing to add to the SYNC_REQ, so just call the super class
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
global pc
host = addr[0]
if host == cfg.get_mgmt_ip():
# Ignore messages from self
return
# We may need to do this in a separate thread, so that we continue to process hellos
LOG.info("Handling sync req")
pc.sync_from_nbr(host)
resp = PatchMessageSyncComplete()
resp.send(sock)
def send(self, sock):
global pc
LOG.info("sending sync req")
self.encode()
message = json.dumps(self.message)
sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port))
class PatchMessageSyncComplete(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_SYNC_COMPLETE)
def encode(self):
# Nothing to add to the SYNC_COMPLETE, so just call the super class
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
global pc
LOG.info("Handling sync complete")
pc.controller_neighbours_lock.acquire()
if not addr[0] in pc.controller_neighbours:
pc.controller_neighbours[addr[0]] = ControllerNeighbour()
pc.controller_neighbours[addr[0]].rx_synced()
pc.controller_neighbours_lock.release()
def send(self, sock):
global pc
LOG.info("sending sync complete")
self.encode()
message = json.dumps(self.message)
sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port))
class PatchMessageHelloAgent(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_HELLO_AGENT)
def encode(self):
global pc
messages.PatchMessage.encode(self)
self.message['patch_op_counter'] = pc.patch_op_counter
def handle(self, sock, addr):
LOG.error("Should not get here")
def send(self, sock):
global pc
self.encode()
message = json.dumps(self.message)
local_hostname = utils.ip_to_versioned_localhost(cfg.agent_mcast_group)
sock.sendto(str.encode(message), (pc.agent_address, cfg.agent_port))
sock.sendto(str.encode(message), (local_hostname, cfg.agent_port))
class PatchMessageSendLatestFeedCommit(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_SEND_LATEST_FEED_COMMIT)
def encode(self):
global pc
messages.PatchMessage.encode(self)
self.message['latest_feed_commit'] = pc.latest_feed_commit
def handle(self, sock, addr):
LOG.error("Should not get here")
def send(self, sock):
global pc
self.encode()
message = json.dumps(self.message)
local_hostname = utils.ip_to_versioned_localhost(cfg.agent_mcast_group)
sock.sendto(str.encode(message), (pc.agent_address, cfg.agent_port))
sock.sendto(str.encode(message), (local_hostname, cfg.agent_port))
class PatchMessageHelloAgentAck(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_HELLO_AGENT_ACK)
self.query_id = 0
self.agent_out_of_date = False
self.agent_hostname = "n/a"
self.agent_requires_reboot = False
self.agent_patch_failed = False
self.agent_sw_version = "unknown"
self.agent_state = "unknown"
def decode(self, data):
messages.PatchMessage.decode(self, data)
if 'query_id' in data:
self.query_id = data['query_id']
if 'out_of_date' in data:
self.agent_out_of_date = data['out_of_date']
if 'hostname' in data:
self.agent_hostname = data['hostname']
if 'requires_reboot' in data:
self.agent_requires_reboot = data['requires_reboot']
if 'patch_failed' in data:
self.agent_patch_failed = data['patch_failed']
if 'sw_version' in data:
self.agent_sw_version = data['sw_version']
if 'state' in data:
self.agent_state = data['state']
def encode(self):
# Nothing to add, so just call the super class
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
global pc
pc.hosts_lock.acquire()
if not addr[0] in pc.hosts:
pc.hosts[addr[0]] = AgentNeighbour(addr[0])
pc.hosts[addr[0]].rx_ack(self.agent_hostname,
self.agent_out_of_date,
self.agent_requires_reboot,
self.query_id,
self.agent_patch_failed,
self.agent_sw_version,
self.agent_state)
pc.hosts_lock.release()
def send(self, sock): # pylint: disable=unused-argument
LOG.error("Should not get here")
class PatchMessageQueryDetailed(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_QUERY_DETAILED)
def encode(self):
# Nothing to add to the message, so just call the super class
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
LOG.error("Should not get here")
def send(self, sock):
self.encode()
message = json.dumps(self.message)
sock.sendall(str.encode(message))
class PatchMessageQueryDetailedResp(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_QUERY_DETAILED_RESP)
self.agent_sw_version = "unknown"
self.latest_sysroot_commit = "unknown"
self.subfunctions = []
self.nodetype = "unknown"
self.agent_sw_version = "unknown"
self.agent_state = "unknown"
def decode(self, data):
messages.PatchMessage.decode(self, data)
if 'latest_sysroot_commit' in data:
self.latest_sysroot_commit = data['latest_sysroot_commit']
if 'nodetype' in data:
self.nodetype = data['nodetype']
if 'sw_version' in data:
self.agent_sw_version = data['sw_version']
if 'subfunctions' in data:
self.subfunctions = data['subfunctions']
if 'state' in data:
self.agent_state = data['state']
def encode(self):
LOG.error("Should not get here")
def handle(self, sock, addr):
global pc
ip = addr[0]
pc.hosts_lock.acquire()
if ip in pc.hosts:
pc.hosts[ip].handle_query_detailed_resp(self.latest_sysroot_commit,
self.nodetype,
self.agent_sw_version,
self.subfunctions,
self.agent_state)
for patch_id in list(pc.interim_state):
if ip in pc.interim_state[patch_id]:
pc.interim_state[patch_id].remove(ip)
if len(pc.interim_state[patch_id]) == 0:
del pc.interim_state[patch_id]
pc.hosts_lock.release()
# CentOS code has an additional call here for
# check_patch_states which was removed in Debian
# The call had to be removed since it was mangling
# pc.allow_insvc_patching value and making all
# patches treat like an in-service patch.
else:
pc.hosts_lock.release()
def send(self, sock): # pylint: disable=unused-argument
LOG.error("Should not get here")
class PatchMessageAgentInstallReq(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_AGENT_INSTALL_REQ)
self.ip = None
self.force = False
def encode(self):
global pc
messages.PatchMessage.encode(self)
self.message['force'] = self.force
def handle(self, sock, addr):
LOG.error("Should not get here")
def send(self, sock):
LOG.info("sending install request to node: %s", self.ip)
self.encode()
message = json.dumps(self.message)
sock.sendto(str.encode(message), (self.ip, cfg.agent_port))
class PatchMessageAgentInstallResp(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_AGENT_INSTALL_RESP)
self.status = False
self.reject_reason = None
def decode(self, data):
messages.PatchMessage.decode(self, data)
if 'status' in data:
self.status = data['status']
if 'reject_reason' in data:
self.reject_reason = data['reject_reason']
def encode(self):
# Nothing to add, so just call the super class
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
LOG.info("Handling install resp from %s", addr[0])
global pc
# LOG.info("Handling hello ack")
pc.hosts_lock.acquire()
if not addr[0] in pc.hosts:
pc.hosts[addr[0]] = AgentNeighbour(addr[0])
pc.hosts[addr[0]].install_status = self.status
pc.hosts[addr[0]].install_pending = False
pc.hosts[addr[0]].install_reject_reason = self.reject_reason
pc.hosts_lock.release()
def send(self, sock): # pylint: disable=unused-argument
LOG.error("Should not get here")
class PatchMessageDropHostReq(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_DROP_HOST_REQ)
self.ip = None
def encode(self):
messages.PatchMessage.encode(self)
self.message['ip'] = self.ip
def decode(self, data):
messages.PatchMessage.decode(self, data)
if 'ip' in data:
self.ip = data['ip']
def handle(self, sock, addr):
global pc
host = addr[0]
if host == cfg.get_mgmt_ip():
# Ignore messages from self
return
if self.ip is None:
LOG.error("Received PATCHMSG_DROP_HOST_REQ with no ip: %s", json.dumps(self.data))
return
pc.drop_host(self.ip, sync_nbr=False)
return
def send(self, sock):
global pc
self.encode()
message = json.dumps(self.message)
sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port))
class PatchController(PatchService):
def __init__(self):
PatchService.__init__(self)
# Locks
self.socket_lock = threading.RLock()
self.controller_neighbours_lock = threading.RLock()
self.hosts_lock = threading.RLock()
self.patch_data_lock = threading.RLock()
self.hosts = {}
self.controller_neighbours = {}
# interim_state is used to track hosts that have not responded
# with fresh queries since a patch was applied or removed, on
# a per-patch basis. This allows the patch controller to move
# patches immediately into a "Partial" state until all nodes
# have responded.
#
self.interim_state = {}
self.sock_out = None
self.sock_in = None
self.controller_address = None
self.agent_address = None
self.patch_op_counter = 1
self.patch_data = PatchData()
self.patch_data.load_all()
try:
self.latest_feed_commit = ostree_utils.get_feed_latest_commit(SW_VERSION)
except OSTreeCommandFail:
LOG.exception("Failure to fetch the feed ostree latest log while "
"initializing Patch Controller")
self.latest_feed_commit = None
self.check_patch_states()
self.base_pkgdata = BasePackageData()
self.allow_insvc_patching = True
if os.path.exists(app_dependency_filename):
try:
with open(app_dependency_filename, 'r') as f:
self.app_dependencies = json.loads(f.read())
except Exception:
LOG.exception("Failed to read app dependencies: %s", app_dependency_filename)
else:
self.app_dependencies = {}
if os.path.isfile(state_file):
self.read_state_file()
else:
self.write_state_file()
def update_config(self):
cfg.read_config()
if self.port != cfg.controller_port:
self.port = cfg.controller_port
# Loopback interface does not support multicast messaging, therefore
# revert to using unicast messaging when configured against the
# loopback device
if cfg.get_mgmt_iface() == constants.LOOPBACK_INTERFACE_NAME:
mgmt_ip = cfg.get_mgmt_ip()
self.mcast_addr = None
self.controller_address = mgmt_ip
self.agent_address = mgmt_ip
else:
self.mcast_addr = cfg.controller_mcast_group
self.controller_address = cfg.controller_mcast_group
self.agent_address = cfg.agent_mcast_group
def socket_lock_acquire(self):
self.socket_lock.acquire()
def socket_lock_release(self):
try:
self.socket_lock.release()
except Exception:
pass
def write_state_file(self):
if six.PY2:
config = configparser.ConfigParser()
elif six.PY3:
config = configparser.ConfigParser(strict=False)
cfgfile = open(state_file, 'w')
config.add_section('runtime')
config.set('runtime', 'patch_op_counter', str(self.patch_op_counter))
config.write(cfgfile)
cfgfile.close()
def read_state_file(self):
if six.PY2:
config = configparser.ConfigParser()
elif six.PY3:
config = configparser.ConfigParser(strict=False)
config.read(state_file)
try:
counter = config.getint('runtime', 'patch_op_counter')
self.patch_op_counter = counter
LOG.info("patch_op_counter is: %d", self.patch_op_counter)
except configparser.Error:
LOG.exception("Failed to read state info")
def handle_nbr_patch_op_counter(self, host, nbr_patch_op_counter):
if self.patch_op_counter >= nbr_patch_op_counter:
return
self.sync_from_nbr(host)
def sync_from_nbr(self, host):
# Sync the patching repo
host_url = utils.ip_to_url(host)
try:
output = subprocess.check_output(["rsync",
"-acv",
"--delete",
"--exclude", "tmp",
"rsync://%s/patching/" % host_url,
"%s/" % patch_dir],
stderr=subprocess.STDOUT)
LOG.info("Synced to mate patching via rsync: %s", output)
except subprocess.CalledProcessError as e:
LOG.error("Failed to rsync: %s", e.output)
return False
try:
output = subprocess.check_output(["rsync",
"-acv",
"--delete",
"rsync://%s/repo/" % host_url,
"%s/" % repo_root_dir],
stderr=subprocess.STDOUT)
LOG.info("Synced to mate repo via rsync: %s", output)
except subprocess.CalledProcessError:
LOG.error("Failed to rsync: %s", output)
return False
self.read_state_file()
self.patch_data_lock.acquire()
self.hosts_lock.acquire()
self.interim_state = {}
self.patch_data.load_all()
self.check_patch_states()
self.hosts_lock.release()
if os.path.exists(app_dependency_filename):
try:
with open(app_dependency_filename, 'r') as f:
self.app_dependencies = json.loads(f.read())
except Exception:
LOG.exception("Failed to read app dependencies: %s", app_dependency_filename)
else:
self.app_dependencies = {}
self.patch_data_lock.release()
return True
def inc_patch_op_counter(self):
self.patch_op_counter += 1
self.write_state_file()
def check_patch_states(self):
# If we have no hosts, we can't be sure of the current patch state
if len(self.hosts) == 0:
for patch_id in self.patch_data.metadata:
self.patch_data.metadata[patch_id]["patchstate"] = constants.UNKNOWN
return
# Default to allowing in-service patching
self.allow_insvc_patching = True
# Take the detailed query results from the hosts and merge with the patch data
self.hosts_lock.acquire()
# Initialize patch state data based on repo state and interim_state presence
for patch_id in self.patch_data.metadata:
if patch_id in self.interim_state:
if self.patch_data.metadata[patch_id]["repostate"] == constants.AVAILABLE:
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_REMOVE
elif self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED:
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_APPLY
if self.patch_data.metadata[patch_id].get("reboot_required") != "N":
self.allow_insvc_patching = False
else:
self.patch_data.metadata[patch_id]["patchstate"] = \
self.patch_data.metadata[patch_id]["repostate"]
self.hosts_lock.release()
def get_store_filename(self, patch_sw_version, contentname):
"""Returns the path of a content file from the store"""
content_dir = package_dir[patch_sw_version]
contentfile = "%s/%s" % (content_dir, contentname)
return contentfile
def get_ostree_tar_filename(self, patch_sw_version, patch_id):
'''
Returns the path of the ostree tarball
:param patch_sw_version: sw version this patch must be applied to
:param patch_id: The patch ID
'''
ostree_tar_dir = package_dir[patch_sw_version]
ostree_tar_filename = "%s/%s-software.tar" % (ostree_tar_dir, patch_id)
return ostree_tar_filename
def get_repo_filename(self, patch_sw_version, contentname):
contentfile = self.get_store_filename(patch_sw_version, contentname)
if not os.path.isfile(contentfile):
msg = "Could not find content: %s" % contentfile
LOG.error(msg)
return None
# OSTREE: need to determine the actual path for the content ie: Content
repo_filename = "%s/Content/%s" % (repo_dir[patch_sw_version], contentname)
return repo_filename
def run_semantic_check(self, action, patch_list):
if not os.path.exists(INITIAL_CONFIG_COMPLETE_FLAG):
# Skip semantic checks if initial configuration isn't complete
return
# Pass the current patch state to the semantic check as a series of args
patch_state_args = []
for patch_id in list(self.patch_data.metadata):
patch_state = '%s=%s' % (patch_id, self.patch_data.metadata[patch_id]["patchstate"])
patch_state_args += ['-p', patch_state]
# Run semantic checks, if any
for patch_id in patch_list:
semchk = os.path.join(semantics_dir, action, patch_id)
if os.path.exists(semchk):
try:
LOG.info("Running semantic check: %s", semchk)
subprocess.check_output([semchk] + patch_state_args,
stderr=subprocess.STDOUT)
LOG.info("Semantic check %s passed", semchk)
except subprocess.CalledProcessError as e:
msg = "Semantic check failed for %s:\n%s" % (patch_id, e.output)
LOG.exception(msg)
raise PatchFail(msg)
def patch_import_api(self, patches):
"""
Import patches
:return:
"""
msg_info = ""
msg_warning = ""
msg_error = ""
# Refresh data, if needed
self.base_pkgdata.loaddirs()
# Protect against duplications
patch_list = sorted(list(set(patches)))
# First, make sure the specified files exist
for patch in patch_list:
if not os.path.isfile(patch):
raise PatchFail("File does not exist: %s" % patch)
try:
if not os.path.exists(avail_dir):
os.makedirs(avail_dir)
if not os.path.exists(applied_dir):
os.makedirs(applied_dir)
if not os.path.exists(committed_dir):
os.makedirs(committed_dir)
except os.error:
msg = "Failed to create directories"
LOG.exception(msg)
raise PatchFail(msg)
msg = "Importing patches: %s" % ",".join(patch_list)
LOG.info(msg)
audit_log_info(msg)
for patch in patch_list:
msg = "Importing patch: %s" % patch
LOG.info(msg)
audit_log_info(msg)
# Get the patch_id from the filename
# and check to see if it's already imported
(patch_id, ext) = os.path.splitext(os.path.basename(patch))
if patch_id in self.patch_data.metadata:
if self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED:
mdir = applied_dir
elif self.patch_data.metadata[patch_id]["repostate"] == constants.COMMITTED:
msg = "%s is committed. Metadata not updated" % patch_id
LOG.info(msg)
msg_info += msg + "\n"
continue
else:
mdir = avail_dir
try:
thispatch = PatchFile.extract_patch(patch,
metadata_dir=mdir,
metadata_only=True,
existing_content=self.patch_data.contents[patch_id],
base_pkgdata=self.base_pkgdata)
self.patch_data.update_patch(thispatch)
msg = "%s is already imported. Updated metadata only" % patch_id
LOG.info(msg)
msg_info += msg + "\n"
except PatchMismatchFailure:
msg = "Contents of %s do not match re-imported patch" % patch_id
LOG.exception(msg)
msg_error += msg + "\n"
continue
except PatchValidationFailure as e:
msg = "Patch validation failed for %s" % patch_id
if str(e) is not None and str(e) != '':
msg += ":\n%s" % str(e)
LOG.exception(msg)
msg_error += msg + "\n"
continue
except PatchFail:
msg = "Failed to import patch %s" % patch_id
LOG.exception(msg)
msg_error += msg + "\n"
continue
if ext != ".patch":
msg = "File must end in .patch extension: %s" \
% os.path.basename(patch)
LOG.exception(msg)
msg_error += msg + "\n"
continue
try:
thispatch = PatchFile.extract_patch(patch,
metadata_dir=avail_dir,
base_pkgdata=self.base_pkgdata)
msg_info += "%s is now available\n" % patch_id
self.patch_data.add_patch(thispatch)
self.patch_data.metadata[patch_id]["repostate"] = constants.AVAILABLE
if len(self.hosts) > 0:
self.patch_data.metadata[patch_id]["patchstate"] = constants.AVAILABLE
else:
self.patch_data.metadata[patch_id]["patchstate"] = constants.UNKNOWN
except PatchValidationFailure as e:
msg = "Patch validation failed for %s" % patch_id
if str(e) is not None and str(e) != '':
msg += ":\n%s" % str(e)
LOG.exception(msg)
msg_error += msg + "\n"
continue
except PatchFail:
msg = "Failed to import patch %s" % patch_id
LOG.exception(msg)
msg_error += msg + "\n"
continue
return dict(info=msg_info, warning=msg_warning, error=msg_error)
def patch_apply_api(self, patch_ids, **kwargs):
"""
Apply patches, moving patches from available to applied and updating repo
:return:
"""
msg_info = ""
msg_warning = ""
msg_error = ""
# Protect against duplications
patch_list = sorted(list(set(patch_ids)))
msg = "Applying patches: %s" % ",".join(patch_list)
LOG.info(msg)
audit_log_info(msg)
if "--all" in patch_list:
# Set patch_ids to list of all available patches
# We're getting this list now, before we load the applied patches
patch_list = []
for patch_id in sorted(list(self.patch_data.metadata)):
if self.patch_data.metadata[patch_id]["repostate"] == constants.AVAILABLE:
patch_list.append(patch_id)
if len(patch_list) == 0:
msg_info += "There are no available patches to be applied.\n"
return dict(info=msg_info, warning=msg_warning, error=msg_error)
# First, verify that all specified patches exist
id_verification = True
for patch_id in patch_list:
if patch_id not in self.patch_data.metadata:
msg = "Patch %s does not exist" % patch_id
LOG.error(msg)
msg_error += msg + "\n"
id_verification = False
if not id_verification:
return dict(info=msg_info, warning=msg_warning, error=msg_error)
# Check for patches that can't be applied during an upgrade
upgrade_check = True
for patch_id in patch_list:
if self.patch_data.metadata[patch_id]["sw_version"] != SW_VERSION \
and self.patch_data.metadata[patch_id].get("apply_active_release_only") == "Y":
msg = "%s cannot be applied in an upgrade" % patch_id
LOG.error(msg)
msg_error += msg + "\n"
upgrade_check = False
if not upgrade_check:
return dict(info=msg_info, warning=msg_warning, error=msg_error)
# Next, check the patch dependencies
# required_patches will map the required patch to the patches that need it
required_patches = {}
for patch_id in patch_list:
for req_patch in self.patch_data.metadata[patch_id]["requires"]:
# Ignore patches in the op set
if req_patch in patch_list:
continue
if req_patch not in required_patches:
required_patches[req_patch] = []
required_patches[req_patch].append(patch_id)
# Now verify the state of the required patches
req_verification = True
for req_patch, iter_patch_list in required_patches.items():
if req_patch not in self.patch_data.metadata \
or self.patch_data.metadata[req_patch]["repostate"] == constants.AVAILABLE:
msg = "%s is required by: %s" % (req_patch, ", ".join(sorted(iter_patch_list)))
msg_error += msg + "\n"
LOG.info(msg)
req_verification = False
if not req_verification:
return dict(info=msg_info, warning=msg_warning, error=msg_error)
if kwargs.get("skip-semantic") != "yes":
self.run_semantic_check(constants.SEMANTIC_PREAPPLY, patch_list)
# Start applying the patches
for patch_id in patch_list:
msg = "Applying patch: %s" % patch_id
LOG.info(msg)
audit_log_info(msg)
if self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED \
or self.patch_data.metadata[patch_id]["repostate"] == constants.COMMITTED:
msg = "%s is already in the repo" % patch_id
LOG.info(msg)
msg_info += msg + "\n"
continue
patch_sw_version = self.patch_data.metadata[patch_id]["sw_version"]
latest_commit = ""
try:
latest_commit = ostree_utils.get_feed_latest_commit(patch_sw_version)
except OSTreeCommandFail:
LOG.exception("Failure during commit consistency check for %s.", patch_id)
if self.patch_data.contents[patch_id]["base"]["commit"] != latest_commit:
msg = "The base commit for %s do not match the latest commit " \
"on this system." % (patch_id)
LOG.info(msg)
msg_info += msg + "\n"
continue
ostree_tar_filename = self.get_ostree_tar_filename(patch_sw_version, patch_id)
# Create a temporary working directory
tmpdir = tempfile.mkdtemp(prefix="patch_")
# Save the current directory, so we can chdir back after
orig_wd = os.getcwd()
# Change to the tmpdir
os.chdir(tmpdir)
try:
# Extract the software.tar
tar = tarfile.open(ostree_tar_filename)
tar.extractall()
feed_ostree = "%s/rel-%s/ostree_repo" % (constants.FEED_OSTREE_BASE_DIR, patch_sw_version)
# Copy extracted folders of software.tar to the feed ostree repo
shutil.copytree(tmpdir, feed_ostree, dirs_exist_ok=True)
except tarfile.TarError:
msg = "Failed to extract the ostree tarball for %s" % patch_id
LOG.exception(msg)
raise OSTreeTarFail(msg)
except shutil.Error:
msg = "Failed to copy the ostree tarball for %s" % patch_id
LOG.exception(msg)
raise OSTreeTarFail(msg)
finally:
# Change back to original working dir
os.chdir(orig_wd)
shutil.rmtree(tmpdir, ignore_errors=True)
try:
# Move the patching metadata from avail to applied dir
shutil.move("%s/%s-metadata.xml" % (avail_dir, patch_id),
"%s/%s-metadata.xml" % (applied_dir, patch_id))
msg_info += "%s is now in the repo\n" % patch_id
except shutil.Error:
msg = "Failed to move the metadata for %s" % patch_id
LOG.exception(msg)
raise MetadataFail(msg)
self.patch_data.metadata[patch_id]["repostate"] = constants.APPLIED
if len(self.hosts) > 0:
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_APPLY
else:
self.patch_data.metadata[patch_id]["patchstate"] = constants.UNKNOWN
# Commit1 in patch metadata.xml file represents the latest commit
# after this patch has been applied to the feed repo
self.latest_feed_commit = self.patch_data.contents[patch_id]["commit1"]["commit"]
self.hosts_lock.acquire()
self.interim_state[patch_id] = list(self.hosts)
self.hosts_lock.release()
return dict(info=msg_info, warning=msg_warning, error=msg_error)
def patch_remove_api(self, patch_ids, **kwargs):
"""
Remove patches, moving patches from applied to available and updating repo
:return:
"""
msg_info = ""
msg_warning = ""
msg_error = ""
remove_unremovable = False
# Protect against duplications
patch_list = sorted(list(set(patch_ids)))
msg = "Removing patches: %s" % ",".join(patch_list)
LOG.info(msg)
audit_log_info(msg)
if kwargs.get("removeunremovable") == "yes":
remove_unremovable = True
# First, verify that all specified patches exist
id_verification = True
for patch_id in patch_list:
if patch_id not in self.patch_data.metadata:
msg = "Patch %s does not exist" % patch_id
LOG.error(msg)
msg_error += msg + "\n"
id_verification = False
if not id_verification:
return dict(info=msg_info, warning=msg_warning, error=msg_error)
# See if any of the patches are marked as unremovable
unremovable_verification = True
for patch_id in patch_list:
if self.patch_data.metadata[patch_id].get("unremovable") == "Y":
if remove_unremovable:
msg = "Unremovable patch %s being removed" % patch_id
LOG.warning(msg)
msg_warning += msg + "\n"
else:
msg = "Patch %s is not removable" % patch_id
LOG.error(msg)
msg_error += msg + "\n"
unremovable_verification = False
elif self.patch_data.metadata[patch_id]['repostate'] == constants.COMMITTED:
msg = "Patch %s is committed and cannot be removed" % patch_id
LOG.error(msg)
msg_error += msg + "\n"
unremovable_verification = False
if not unremovable_verification:
return dict(info=msg_info, warning=msg_warning, error=msg_error)
# Next, see if any of the patches are required by applied patches
# required_patches will map the required patch to the patches that need it
required_patches = {}
for patch_iter in list(self.patch_data.metadata):
# Ignore patches in the op set
if patch_iter in patch_list:
continue
# Only check applied patches
if self.patch_data.metadata[patch_iter]["repostate"] == constants.AVAILABLE:
continue
for req_patch in self.patch_data.metadata[patch_iter]["requires"]:
if req_patch not in patch_list:
continue
if req_patch not in required_patches:
required_patches[req_patch] = []
required_patches[req_patch].append(patch_iter)
if len(required_patches) > 0:
for req_patch, iter_patch_list in required_patches.items():
msg = "%s is required by: %s" % (req_patch, ", ".join(sorted(iter_patch_list)))
msg_error += msg + "\n"
LOG.info(msg)
return dict(info=msg_info, warning=msg_warning, error=msg_error)
if kwargs.get("skipappcheck") != "yes":
# Check application dependencies before removing
required_patches = {}
for patch_id in patch_list:
for appname, iter_patch_list in self.app_dependencies.items():
if patch_id in iter_patch_list:
if patch_id not in required_patches:
required_patches[patch_id] = []
required_patches[patch_id].append(appname)
if len(required_patches) > 0:
for req_patch, app_list in required_patches.items():
msg = "%s is required by application(s): %s" % (req_patch, ", ".join(sorted(app_list)))
msg_error += msg + "\n"
LOG.info(msg)
return dict(info=msg_info, warning=msg_warning, error=msg_error)
if kwargs.get("skip-semantic") != "yes":
self.run_semantic_check(constants.SEMANTIC_PREREMOVE, patch_list)
for patch_id in patch_list:
msg = "Removing patch: %s" % patch_id
LOG.info(msg)
audit_log_info(msg)
if self.patch_data.metadata[patch_id]["repostate"] == constants.AVAILABLE:
msg = "%s is not in the repo" % patch_id
LOG.info(msg)
msg_info += msg + "\n"
continue
patch_sw_version = self.patch_data.metadata[patch_id]["sw_version"]
# Base commit is fetched from the patch metadata
base_commit = self.patch_data.contents[patch_id]["base"]["commit"]
feed_ostree = "%s/rel-%s/ostree_repo" % (constants.FEED_OSTREE_BASE_DIR, patch_sw_version)
try:
# Reset the ostree HEAD
ostree_utils.reset_ostree_repo_head(base_commit, feed_ostree)
# Delete all commits that belong to this patch
for i in range(int(self.patch_data.contents[patch_id]["number_of_commits"])):
commit_to_delete = self.patch_data.contents[patch_id]["commit%s" % (i + 1)]["commit"]
ostree_utils.delete_ostree_repo_commit(commit_to_delete, feed_ostree)
# Update the feed ostree summary
ostree_utils.update_repo_summary_file(feed_ostree)
except OSTreeCommandFail:
LOG.exception("Failure during patch apply for %s.", patch_id)
try:
# Move the metadata to the available dir
shutil.move("%s/%s-metadata.xml" % (applied_dir, patch_id),
"%s/%s-metadata.xml" % (avail_dir, patch_id))
msg_info += "%s has been removed from the repo\n" % patch_id
except shutil.Error:
msg = "Failed to move the metadata for %s" % patch_id
LOG.exception(msg)
raise MetadataFail(msg)
self.patch_data.metadata[patch_id]["repostate"] = constants.AVAILABLE
if len(self.hosts) > 0:
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_REMOVE
else:
self.patch_data.metadata[patch_id]["patchstate"] = constants.UNKNOWN
# Base Commit in patch metadata.xml file represents the latest commit
# after this patch has been removed from the feed repo
self.latest_feed_commit = self.patch_data.contents[patch_id]["base"]["commit"]
self.hosts_lock.acquire()
self.interim_state[patch_id] = list(self.hosts)
self.hosts_lock.release()
return dict(info=msg_info, warning=msg_warning, error=msg_error)
def patch_delete_api(self, patch_ids):
"""
Delete patches
:return:
"""
msg_info = ""
msg_warning = ""
msg_error = ""
# Protect against duplications
patch_list = sorted(list(set(patch_ids)))
msg = "Deleting patches: %s" % ",".join(patch_list)
LOG.info(msg)
audit_log_info(msg)
# Verify patches exist and are in proper state first
id_verification = True
for patch_id in patch_list:
if patch_id not in self.patch_data.metadata:
msg = "Patch %s does not exist" % patch_id
LOG.error(msg)
msg_error += msg + "\n"
id_verification = False
continue
# Get the aggregated patch state, if possible
patchstate = constants.UNKNOWN
if patch_id in self.patch_data.metadata:
patchstate = self.patch_data.metadata[patch_id]["patchstate"]
if self.patch_data.metadata[patch_id]["repostate"] != constants.AVAILABLE or \
(patchstate != constants.AVAILABLE and patchstate != constants.UNKNOWN):
msg = "Patch %s not in Available state" % patch_id
LOG.error(msg)
msg_error += msg + "\n"
id_verification = False
continue
if not id_verification:
return dict(info=msg_info, warning=msg_warning, error=msg_error)
# Handle operation
for patch_id in patch_list:
patch_sw_version = self.patch_data.metadata[patch_id]["sw_version"]
ostree_tar_filename = self.get_ostree_tar_filename(patch_sw_version, patch_id)
if not os.path.isfile(ostree_tar_filename):
# We're deleting the patch anyway, so the missing file
# doesn't really matter
continue
try:
os.remove(ostree_tar_filename)
except OSError:
msg = "Failed to remove ostree tarball %s" % ostree_tar_filename
LOG.exception(msg)
raise OSTreeTarFail(msg)
try:
# Delete the metadata
os.remove("%s/%s-metadata.xml" % (avail_dir, patch_id))
except OSError:
msg = "Failed to remove metadata for %s" % patch_id
LOG.exception(msg)
raise MetadataFail(msg)
self.patch_data.delete_patch(patch_id)
msg = "%s has been deleted" % patch_id
LOG.info(msg)
msg_info += msg + "\n"
return dict(info=msg_info, warning=msg_warning, error=msg_error)
def patch_init_release_api(self, release):
"""
Create an empty repo for a new release
:return:
"""
msg_info = ""
msg_warning = ""
msg_error = ""
msg = "Initializing repo for: %s" % release
LOG.info(msg)
audit_log_info(msg)
if release == SW_VERSION:
msg = "Rejected: Requested release %s is running release" % release
msg_error += msg + "\n"
LOG.info(msg)
return dict(info=msg_info, warning=msg_warning, error=msg_error)
# Refresh data
self.base_pkgdata.loaddirs()
self.patch_data.load_all_metadata(avail_dir, repostate=constants.AVAILABLE)
self.patch_data.load_all_metadata(applied_dir, repostate=constants.APPLIED)
self.patch_data.load_all_metadata(committed_dir, repostate=constants.COMMITTED)
repo_dir[release] = "%s/rel-%s" % (repo_root_dir, release)
# Verify the release doesn't already exist
if os.path.exists(repo_dir[release]):
msg = "Patch repository for %s already exists" % release
msg_info += msg + "\n"
LOG.info(msg)
return dict(info=msg_info, warning=msg_warning, error=msg_error)
# Create the repo
try:
# todo(jcasteli) determine if ostree change needs a createrepo equivalent
output = "UNDER CONSTRUCTION for OSTREE"
LOG.info("Repo[%s] updated:\n%s", release, output)
except Exception:
msg = "Failed to update the repo for %s" % release
LOG.exception(msg)
# Wipe out what was created
shutil.rmtree(repo_dir[release])
del repo_dir[release]
raise PatchFail(msg)
return dict(info=msg_info, warning=msg_warning, error=msg_error)
def patch_del_release_api(self, release):
"""
Delete the repo and patches for second release
:return:
"""
msg_info = ""
msg_warning = ""
msg_error = ""
msg = "Deleting repo and patches for: %s" % release
LOG.info(msg)
audit_log_info(msg)
if release == SW_VERSION:
msg = "Rejected: Requested release %s is running release" % release
msg_error += msg + "\n"
LOG.info(msg)
return dict(info=msg_info, warning=msg_warning, error=msg_error)
# Delete patch XML files
for patch_id in list(self.patch_data.metadata):
if self.patch_data.metadata[patch_id]["sw_version"] != release:
continue
if self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED:
mdir = applied_dir
elif self.patch_data.metadata[patch_id]["repostate"] == constants.COMMITTED:
mdir = committed_dir
else:
mdir = avail_dir
for action in constants.SEMANTIC_ACTIONS:
action_file = os.path.join(semantics_dir, action, patch_id)
if not os.path.isfile(action_file):
continue
try:
os.remove(action_file)
except OSError:
msg = "Failed to remove semantic %s" % action_file
LOG.exception(msg)
raise SemanticFail(msg)
try:
# Delete the metadata
os.remove("%s/%s-metadata.xml" % (mdir, patch_id))
except OSError:
msg = "Failed to remove metadata for %s" % patch_id
LOG.exception(msg)
# Refresh patch data
self.patch_data = PatchData()
self.patch_data.load_all_metadata(avail_dir, repostate=constants.AVAILABLE)
self.patch_data.load_all_metadata(applied_dir, repostate=constants.APPLIED)
self.patch_data.load_all_metadata(committed_dir, repostate=constants.COMMITTED)
raise MetadataFail(msg)
# Delete the packages dir
package_dir[release] = "%s/%s" % (root_package_dir, release)
if os.path.exists(package_dir[release]):
try:
shutil.rmtree(package_dir[release])
except shutil.Error:
msg = "Failed to delete package dir for %s" % release
LOG.exception(msg)
del package_dir[release]
# Verify the release exists
repo_dir[release] = "%s/rel-%s" % (repo_root_dir, release)
if not os.path.exists(repo_dir[release]):
# Nothing to do
msg = "Patch repository for %s does not exist" % release
msg_info += msg + "\n"
LOG.info(msg)
del repo_dir[release]
# Refresh patch data
self.patch_data = PatchData()
self.patch_data.load_all_metadata(avail_dir, repostate=constants.AVAILABLE)
self.patch_data.load_all_metadata(applied_dir, repostate=constants.APPLIED)
self.patch_data.load_all_metadata(committed_dir, repostate=constants.COMMITTED)
return dict(info=msg_info, warning=msg_warning, error=msg_error)
# Delete the repo
try:
shutil.rmtree(repo_dir[release])
except shutil.Error:
msg = "Failed to delete repo for %s" % release
LOG.exception(msg)
del repo_dir[release]
if self.base_pkgdata is not None and release in self.base_pkgdata.pkgs:
del self.base_pkgdata.pkgs[release]
# Refresh patch data
self.patch_data = PatchData()
self.patch_data.load_all_metadata(avail_dir, repostate=constants.AVAILABLE)
self.patch_data.load_all_metadata(applied_dir, repostate=constants.APPLIED)
self.patch_data.load_all_metadata(committed_dir, repostate=constants.COMMITTED)
return dict(info=msg_info, warning=msg_warning, error=msg_error)
def patch_query_what_requires(self, patch_ids):
"""
Query the known patches to see which have dependencies on the specified patches
:return:
"""
msg_info = ""
msg_warning = ""
msg_error = ""
msg = "Querying what requires patches: %s" % ",".join(patch_ids)
LOG.info(msg)
audit_log_info(msg)
# First, verify that all specified patches exist
id_verification = True
for patch_id in patch_ids:
if patch_id not in self.patch_data.metadata:
msg = "Patch %s does not exist" % patch_id
LOG.error(msg)
msg_error += msg + "\n"
id_verification = False
if not id_verification:
return dict(info=msg_info, warning=msg_warning, error=msg_error)
required_patches = {}
for patch_iter in list(self.patch_data.metadata):
for req_patch in self.patch_data.metadata[patch_iter]["requires"]:
if req_patch not in patch_ids:
continue
if req_patch not in required_patches:
required_patches[req_patch] = []
required_patches[req_patch].append(patch_iter)
for patch_id in patch_ids:
if patch_id in required_patches:
iter_patch_list = required_patches[patch_id]
msg_info += "%s is required by: %s\n" % (patch_id, ", ".join(sorted(iter_patch_list)))
else:
msg_info += "%s is not required by any patches.\n" % patch_id
return dict(info=msg_info, warning=msg_warning, error=msg_error)
def send_latest_feed_commit_to_agent(self):
"""
Notify the patch agent that the latest commit on the feed
repo has been updated
"""
# Skip sending messages if host not yet provisioned
if self.sock_out is None:
LOG.info("Skipping send feed commit to agent")
return
send_commit_to_agent = PatchMessageSendLatestFeedCommit()
self.socket_lock.acquire()
send_commit_to_agent.send(self.sock_out)
self.socket_lock.release()
def patch_sync(self):
# Increment the patch_op_counter here
self.inc_patch_op_counter()
self.patch_data_lock.acquire()
# self.patch_data.load_all()
self.check_patch_states()
self.patch_data_lock.release()
if self.sock_out is None:
return True
# Send the sync requests
self.controller_neighbours_lock.acquire()
for n in self.controller_neighbours:
self.controller_neighbours[n].clear_synced()
self.controller_neighbours_lock.release()
msg = PatchMessageSyncReq()
self.socket_lock.acquire()
msg.send(self.sock_out)
self.socket_lock.release()
# Now we wait, up to two mins. future enhancement: Wait on a condition
my_ip = cfg.get_mgmt_ip()
sync_rc = False
max_time = time.time() + 120
while time.time() < max_time:
all_done = True
self.controller_neighbours_lock.acquire()
for n in self.controller_neighbours:
if n != my_ip and not self.controller_neighbours[n].get_synced():
all_done = False
self.controller_neighbours_lock.release()
if all_done:
LOG.info("Sync complete")
sync_rc = True
break
time.sleep(0.5)
# Send hellos to the hosts now, to get queries performed
hello_agent = PatchMessageHelloAgent()
self.socket_lock.acquire()
hello_agent.send(self.sock_out)
self.socket_lock.release()
if not sync_rc:
LOG.info("Timed out waiting for sync completion")
return sync_rc
def patch_query_cached(self, **kwargs):
query_state = None
if "show" in kwargs:
if kwargs["show"] == "available":
query_state = constants.AVAILABLE
elif kwargs["show"] == "applied":
query_state = constants.APPLIED
elif kwargs["show"] == "committed":
query_state = constants.COMMITTED
query_release = None
if "release" in kwargs:
query_release = kwargs["release"]
results = {}
self.patch_data_lock.acquire()
if query_state is None and query_release is None:
# Return everything
results = self.patch_data.metadata
else:
# Filter results
for patch_id, data in self.patch_data.metadata.items():
if query_state is not None and data["repostate"] != query_state:
continue
if query_release is not None and data["sw_version"] != query_release:
continue
results[patch_id] = data
self.patch_data_lock.release()
return results
def patch_query_specific_cached(self, patch_ids):
audit_log_info("Patch show")
results = {"metadata": {},
"contents": {},
"error": ""}
self.patch_data_lock.acquire()
for patch_id in patch_ids:
if patch_id not in list(self.patch_data.metadata):
results["error"] += "%s is unrecognized\n" % patch_id
for patch_id, data in self.patch_data.metadata.items():
if patch_id in patch_ids:
results["metadata"][patch_id] = data
for patch_id, data in self.patch_data.contents.items():
if patch_id in patch_ids:
results["contents"][patch_id] = data
self.patch_data_lock.release()
return results
def get_dependencies(self, patch_ids, recursive):
dependencies = set()
patch_added = False
self.patch_data_lock.acquire()
# Add patches to workset
for patch_id in sorted(patch_ids):
dependencies.add(patch_id)
patch_added = True
while patch_added:
patch_added = False
for patch_id in sorted(dependencies):
for req in self.patch_data.metadata[patch_id]["requires"]:
if req not in dependencies:
dependencies.add(req)
patch_added = recursive
self.patch_data_lock.release()
return sorted(dependencies)
def patch_query_dependencies(self, patch_ids, **kwargs):
msg = "Patch query-dependencies %s" % patch_ids
LOG.info(msg)
audit_log_info(msg)
failure = False
results = {"patches": [],
"error": ""}
recursive = False
if kwargs.get("recursive") == "yes":
recursive = True
self.patch_data_lock.acquire()
# Verify patch IDs
for patch_id in sorted(patch_ids):
if patch_id not in list(self.patch_data.metadata):
errormsg = "%s is unrecognized\n" % patch_id
LOG.info("patch_query_dependencies: %s", errormsg)
results["error"] += errormsg
failure = True
self.patch_data_lock.release()
if failure:
LOG.info("patch_query_dependencies failed")
return results
results["patches"] = self.get_dependencies(patch_ids, recursive)
return results
def patch_commit(self, patch_ids, dry_run=False):
msg = "Patch commit %s" % patch_ids
LOG.info(msg)
audit_log_info(msg)
try:
if not os.path.exists(committed_dir):
os.makedirs(committed_dir)
except os.error:
msg = "Failed to create %s" % committed_dir
LOG.exception(msg)
raise PatchFail(msg)
failure = False
recursive = True
keep = {}
cleanup = {}
cleanup_files = set()
results = {"info": "",
"error": ""}
# Ensure there are only REL patches
non_rel_list = []
self.patch_data_lock.acquire()
for patch_id in self.patch_data.metadata:
if self.patch_data.metadata[patch_id]['status'] != constants.STATUS_RELEASED:
non_rel_list.append(patch_id)
self.patch_data_lock.release()
if len(non_rel_list) > 0:
errormsg = "A commit cannot be performed with non-REL status patches in the system:\n"
for patch_id in non_rel_list:
errormsg += " %s\n" % patch_id
LOG.info("patch_commit rejected: %s", errormsg)
results["error"] += errormsg
return results
# Verify patch IDs
self.patch_data_lock.acquire()
for patch_id in sorted(patch_ids):
if patch_id not in list(self.patch_data.metadata):
errormsg = "%s is unrecognized\n" % patch_id
LOG.info("patch_commit: %s", errormsg)
results["error"] += errormsg
failure = True
self.patch_data_lock.release()
if failure:
LOG.info("patch_commit: Failed patch ID check")
return results
commit_list = self.get_dependencies(patch_ids, recursive)
# Check patch states
avail_list = []
self.patch_data_lock.acquire()
for patch_id in commit_list:
if self.patch_data.metadata[patch_id]['patchstate'] != constants.APPLIED \
and self.patch_data.metadata[patch_id]['patchstate'] != constants.COMMITTED:
avail_list.append(patch_id)
self.patch_data_lock.release()
if len(avail_list) > 0:
errormsg = "The following patches are not applied and cannot be committed:\n"
for patch_id in avail_list:
errormsg += " %s\n" % patch_id
LOG.info("patch_commit rejected: %s", errormsg)
results["error"] += errormsg
return results
# Get list of packages
self.patch_data_lock.acquire()
for patch_id in commit_list:
patch_sw_version = self.patch_data.metadata[patch_id]["sw_version"]
if patch_sw_version not in keep:
keep[patch_sw_version] = {}
if patch_sw_version not in cleanup:
cleanup[patch_sw_version] = {}
self.patch_data_lock.release()
# Calculate disk space
disk_space = 0
for rpmfile in cleanup_files:
statinfo = os.stat(rpmfile)
disk_space += statinfo.st_size
if dry_run:
results["info"] = "This commit operation would free %0.2f MiB" % (disk_space / (1024.0 * 1024.0))
return results
# Do the commit
# Move the metadata to the committed dir
for patch_id in commit_list:
metadata_fname = "%s-metadata.xml" % patch_id
applied_fname = os.path.join(applied_dir, metadata_fname)
committed_fname = os.path.join(committed_dir, metadata_fname)
if os.path.exists(applied_fname):
try:
shutil.move(applied_fname, committed_fname)
except shutil.Error:
msg = "Failed to move the metadata for %s" % patch_id
LOG.exception(msg)
raise MetadataFail(msg)
# Delete the files
for contentfile in cleanup_files:
try:
os.remove(contentfile)
except OSError:
msg = "Failed to remove: %s" % contentfile
LOG.exception(msg)
raise MetadataFail(msg)
# OSTREE: this repo update code needs to be re-examined
for ver, rdir in repo_dir.items():
try:
# todo(jcasteli) determine if ostree change needs additional actions
# old code was calling 'createrepo' for rpms
output = "OSTREE determined a change occurred rdir=%s" % rdir
LOG.info("Repo[%s] updated:\n%s", ver, output)
except Exception:
msg = "Failed to update the repo for %s" % ver
LOG.exception(msg)
raise PatchFail(msg)
self.patch_data.load_all()
results["info"] = "The patches have been committed."
return results
def query_host_cache(self):
output = []
self.hosts_lock.acquire()
for nbr in list(self.hosts):
host = self.hosts[nbr].get_dict()
host["interim_state"] = False
for patch_id in list(pc.interim_state):
if nbr in pc.interim_state[patch_id]:
host["interim_state"] = True
output.append(host)
self.hosts_lock.release()
return output
def any_patch_host_installing(self):
rc = False
self.hosts_lock.acquire()
for host in self.hosts.values():
if host.state == constants.PATCH_AGENT_STATE_INSTALLING:
rc = True
break
self.hosts_lock.release()
return rc
def patch_host_install(self, host_ip, force, async_req=False):
msg_info = ""
msg_warning = ""
msg_error = ""
ip = host_ip
self.hosts_lock.acquire()
# If not in hosts table, maybe a hostname was used instead
if host_ip not in self.hosts:
try:
ip = utils.gethostbyname(host_ip)
if ip not in self.hosts:
# Translated successfully, but IP isn't in the table.
# Raise an exception to drop out to the failure handling
raise PatchError("Host IP (%s) not in table" % ip)
except Exception:
self.hosts_lock.release()
msg = "Unknown host specified: %s" % host_ip
msg_error += msg + "\n"
LOG.error("Error in host-install: %s", msg)
return dict(info=msg_info, warning=msg_warning, error=msg_error)
msg = "Running host-install for %s (%s), force=%s, async_req=%s" % (host_ip, ip, force, async_req)
LOG.info(msg)
audit_log_info(msg)
if self.allow_insvc_patching:
LOG.info("Allowing in-service patching")
force = True
self.hosts[ip].install_pending = True
self.hosts[ip].install_status = False
self.hosts[ip].install_reject_reason = None
self.hosts_lock.release()
installreq = PatchMessageAgentInstallReq()
installreq.ip = ip
installreq.force = force
installreq.encode()
self.socket_lock.acquire()
installreq.send(self.sock_out)
self.socket_lock.release()
if async_req:
# async_req install requested, so return now
msg = "Patch installation request sent to %s." % self.hosts[ip].hostname
msg_info += msg + "\n"
LOG.info("host-install async_req: %s", msg)
return dict(info=msg_info, warning=msg_warning, error=msg_error)
# Now we wait, up to ten mins. future enhancement: Wait on a condition
resp_rx = False
max_time = time.time() + 600
while time.time() < max_time:
self.hosts_lock.acquire()
if ip not in self.hosts:
# The host aged out while we were waiting
self.hosts_lock.release()
msg = "Agent expired while waiting: %s" % ip
msg_error += msg + "\n"
LOG.error("Error in host-install: %s", msg)
break
if not self.hosts[ip].install_pending:
# We got a response
resp_rx = True
if self.hosts[ip].install_status:
msg = "Patch installation was successful on %s." % self.hosts[ip].hostname
msg_info += msg + "\n"
LOG.info("host-install: %s", msg)
elif self.hosts[ip].install_reject_reason:
msg = "Patch installation rejected by %s. %s" % (
self.hosts[ip].hostname,
self.hosts[ip].install_reject_reason)
msg_error += msg + "\n"
LOG.error("Error in host-install: %s", msg)
else:
msg = "Patch installation failed on %s." % self.hosts[ip].hostname
msg_error += msg + "\n"
LOG.error("Error in host-install: %s", msg)
self.hosts_lock.release()
break
self.hosts_lock.release()
time.sleep(0.5)
if not resp_rx:
msg = "Timeout occurred while waiting response from %s." % ip
msg_error += msg + "\n"
LOG.error("Error in host-install: %s", msg)
return dict(info=msg_info, warning=msg_warning, error=msg_error)
def drop_host(self, host_ip, sync_nbr=True):
msg_info = ""
msg_warning = ""
msg_error = ""
ip = host_ip
self.hosts_lock.acquire()
# If not in hosts table, maybe a hostname was used instead
if host_ip not in self.hosts:
try:
# Because the host may be getting dropped due to deletion,
# we may be unable to do a hostname lookup. Instead, we'll
# iterate through the table here.
for host in list(self.hosts):
if host_ip == self.hosts[host].hostname:
ip = host
break
if ip not in self.hosts:
# Translated successfully, but IP isn't in the table.
# Raise an exception to drop out to the failure handling
raise PatchError("Host IP (%s) not in table" % ip)
except Exception:
self.hosts_lock.release()
msg = "Unknown host specified: %s" % host_ip
msg_error += msg + "\n"
LOG.error("Error in drop-host: %s", msg)
return dict(info=msg_info, warning=msg_warning, error=msg_error)
msg = "Running drop-host for %s (%s)" % (host_ip, ip)
LOG.info(msg)
audit_log_info(msg)
del self.hosts[ip]
for patch_id in list(self.interim_state):
if ip in self.interim_state[patch_id]:
self.interim_state[patch_id].remove(ip)
self.hosts_lock.release()
if sync_nbr:
sync_msg = PatchMessageDropHostReq()
sync_msg.ip = ip
self.socket_lock.acquire()
sync_msg.send(self.sock_out)
self.socket_lock.release()
return dict(info=msg_info, warning=msg_warning, error=msg_error)
def is_applied(self, patch_ids):
all_applied = True
self.patch_data_lock.acquire()
for patch_id in patch_ids:
if patch_id not in self.patch_data.metadata:
all_applied = False
break
if self.patch_data.metadata[patch_id]["patchstate"] != constants.APPLIED:
all_applied = False
break
self.patch_data_lock.release()
return all_applied
def is_available(self, patch_ids):
all_available = True
self.patch_data_lock.acquire()
for patch_id in patch_ids:
if patch_id not in self.patch_data.metadata:
all_available = False
break
if self.patch_data.metadata[patch_id]["patchstate"] != \
constants.AVAILABLE:
all_available = False
break
self.patch_data_lock.release()
return all_available
def report_app_dependencies(self, patch_ids, **kwargs):
"""
Handle report of application dependencies
"""
if "app" not in kwargs:
raise PatchInvalidRequest
appname = kwargs.get("app")
LOG.info("Handling app dependencies report: app=%s, patch_ids=%s",
appname, ','.join(patch_ids))
self.patch_data_lock.acquire()
if len(patch_ids) == 0:
if appname in self.app_dependencies:
del self.app_dependencies[appname]
else:
self.app_dependencies[appname] = patch_ids
try:
tmpfile, tmpfname = tempfile.mkstemp(
prefix=app_dependency_basename,
dir=constants.PATCH_STORAGE_DIR)
os.write(tmpfile, json.dumps(self.app_dependencies))
os.close(tmpfile)
os.rename(tmpfname, app_dependency_filename)
except Exception:
LOG.exception("Failed in report_app_dependencies")
raise PatchFail("Internal failure")
finally:
self.patch_data_lock.release()
return True
def query_app_dependencies(self):
"""
Query application dependencies
"""
self.patch_data_lock.acquire()
data = self.app_dependencies
self.patch_data_lock.release()
return dict(data)
# The wsgiref.simple_server module has an error handler that catches
# and prints any exceptions that occur during the API handling to stderr.
# This means the patching sys.excepthook handler that logs uncaught
# exceptions is never called, and those exceptions are lost.
#
# To get around this, we're subclassing the simple_server.ServerHandler
# in order to replace the handle_error method with a custom one that
# logs the exception instead, and will set a global flag to shutdown
# the server and reset.
#
class MyServerHandler(simple_server.ServerHandler):
def handle_error(self):
LOG.exception('An uncaught exception has occurred:')
if not self.headers_sent:
self.result = self.error_output(self.environ, self.start_response)
self.finish_response()
global keep_running
keep_running = False
def get_handler_cls():
cls = simple_server.WSGIRequestHandler
# old-style class doesn't support super
class MyHandler(cls, object):
def address_string(self):
# In the future, we could provide a config option to allow reverse DNS lookup
return self.client_address[0]
# Overload the handle function to use our own MyServerHandler
def handle(self):
"""Handle a single HTTP request"""
self.raw_requestline = self.rfile.readline()
if not self.parse_request(): # An error code has been sent, just exit
return
handler = MyServerHandler(
self.rfile, self.wfile, self.get_stderr(), self.get_environ()
)
handler.request_handler = self # pylint: disable=attribute-defined-outside-init
handler.run(self.server.get_app())
return MyHandler
class PatchControllerApiThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.wsgi = None
def run(self):
host = "127.0.0.1"
port = cfg.api_port
try:
# In order to support IPv6, server_class.address_family must be
# set to the correct address family. Because the unauthenticated
# API always uses IPv4 for the loopback address, the address_family
# variable cannot be set directly in the WSGIServer class, so a
# local subclass needs to be created for the call to make_server,
# where the correct address_family can be specified.
class server_class(simple_server.WSGIServer):
pass
server_class.address_family = socket.AF_INET
self.wsgi = simple_server.make_server(
host, port,
app.VersionSelectorApplication(),
server_class=server_class,
handler_class=get_handler_cls())
self.wsgi.socket.settimeout(api_socket_timeout)
global keep_running
while keep_running:
self.wsgi.handle_request()
# Call garbage collect after wsgi request is handled,
# to ensure any open file handles are closed in the case
# of an upload.
gc.collect()
except Exception:
# Log all exceptions
LOG.exception("Error occurred during request processing")
global thread_death
thread_death.set()
def kill(self):
# Must run from other thread
if self.wsgi is not None:
self.wsgi.shutdown()
class PatchControllerAuthApiThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
# LOG.info ("Initializing Authenticated API thread")
self.wsgi = None
def run(self):
host = CONF.auth_api_bind_ip
port = CONF.auth_api_port
if host is None:
host = utils.get_versioned_address_all()
try:
# Can only launch authenticated server post-config
while not os.path.exists('/etc/platform/.initial_config_complete'):
time.sleep(5)
# In order to support IPv6, server_class.address_family must be
# set to the correct address family. Because the unauthenticated
# API always uses IPv4 for the loopback address, the address_family
# variable cannot be set directly in the WSGIServer class, so a
# local subclass needs to be created for the call to make_server,
# where the correct address_family can be specified.
class server_class(simple_server.WSGIServer):
pass
server_class.address_family = utils.get_management_family()
self.wsgi = simple_server.make_server(
host, port,
auth_app.VersionSelectorApplication(),
server_class=server_class,
handler_class=get_handler_cls())
# self.wsgi.serve_forever()
self.wsgi.socket.settimeout(api_socket_timeout)
global keep_running
while keep_running:
self.wsgi.handle_request()
# Call garbage collect after wsgi request is handled,
# to ensure any open file handles are closed in the case
# of an upload.
gc.collect()
except Exception:
# Log all exceptions
LOG.exception("Authorized API failure: Error occurred during request processing")
def kill(self):
# Must run from other thread
if self.wsgi is not None:
self.wsgi.shutdown()
class PatchControllerMainThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
# LOG.info ("Initializing Main thread")
def run(self):
global pc
global thread_death
# LOG.info ("In Main thread")
try:
sock_in = pc.setup_socket()
while sock_in is None:
# Check every thirty seconds?
# Once we've got a conf file, tied into packstack,
# we'll get restarted when the file is updated,
# and this should be unnecessary.
time.sleep(30)
sock_in = pc.setup_socket()
# Ok, now we've got our socket. Let's start with a hello!
pc.socket_lock.acquire()
hello = PatchMessageHello()
hello.send(pc.sock_out)
hello_agent = PatchMessageHelloAgent()
hello_agent.send(pc.sock_out)
pc.socket_lock.release()
# Send hello every thirty seconds
hello_timeout = time.time() + 30.0
remaining = 30
agent_query_conns = []
while True:
# Check to see if any other thread has died
if thread_death.is_set():
LOG.info("Detected thread death. Terminating")
return
# Check for in-service patch restart flag
if os.path.exists(insvc_patch_restart_controller):
LOG.info("In-service patch restart flag detected. Exiting.")
global keep_running
keep_running = False
os.remove(insvc_patch_restart_controller)
return
inputs = [pc.sock_in] + agent_query_conns
outputs = []
# LOG.info("Running select, remaining=%d", remaining)
rlist, wlist, xlist = select.select(inputs, outputs, inputs, remaining)
if (len(rlist) == 0 and
len(wlist) == 0 and
len(xlist) == 0):
# Timeout hit
pc.audit_socket()
# LOG.info("Checking sockets")
for s in rlist:
data = ''
addr = None
msg = None
if s == pc.sock_in:
# Receive from UDP
pc.socket_lock.acquire()
data, addr = s.recvfrom(1024)
pc.socket_lock.release()
else:
# Receive from TCP
while True:
try:
packet = s.recv(1024)
except socket.error:
LOG.exception("Socket error on recv")
data = ''
break
if packet:
data += packet.decode()
if data == '':
break
try:
json.loads(data)
break
except ValueError:
# Message is incomplete
continue
else:
LOG.info('End of TCP message received')
break
if data == '':
# Connection dropped
agent_query_conns.remove(s)
s.close()
continue
# Get the TCP endpoint address
addr = s.getpeername()
msgdata = json.loads(data)
# For now, discard any messages that are not msgversion==1
if 'msgversion' in msgdata and msgdata['msgversion'] != 1:
continue
if 'msgtype' in msgdata:
if msgdata['msgtype'] == messages.PATCHMSG_HELLO:
msg = PatchMessageHello()
elif msgdata['msgtype'] == messages.PATCHMSG_HELLO_ACK:
msg = PatchMessageHelloAck()
elif msgdata['msgtype'] == messages.PATCHMSG_SYNC_REQ:
msg = PatchMessageSyncReq()
elif msgdata['msgtype'] == messages.PATCHMSG_SYNC_COMPLETE:
msg = PatchMessageSyncComplete()
elif msgdata['msgtype'] == messages.PATCHMSG_HELLO_AGENT_ACK:
msg = PatchMessageHelloAgentAck()
elif msgdata['msgtype'] == messages.PATCHMSG_QUERY_DETAILED_RESP:
msg = PatchMessageQueryDetailedResp()
elif msgdata['msgtype'] == messages.PATCHMSG_AGENT_INSTALL_RESP:
msg = PatchMessageAgentInstallResp()
elif msgdata['msgtype'] == messages.PATCHMSG_DROP_HOST_REQ:
msg = PatchMessageDropHostReq()
if msg is None:
msg = messages.PatchMessage()
msg.decode(msgdata)
if s == pc.sock_in:
msg.handle(pc.sock_out, addr)
else:
msg.handle(s, addr)
# We can drop the connection after a query response
if msg.msgtype == messages.PATCHMSG_QUERY_DETAILED_RESP and s != pc.sock_in:
agent_query_conns.remove(s)
s.shutdown(socket.SHUT_RDWR)
s.close()
while len(stale_hosts) > 0 and len(agent_query_conns) <= 5:
ip = stale_hosts.pop()
try:
agent_sock = socket.create_connection((ip, cfg.agent_port))
query = PatchMessageQueryDetailed()
query.send(agent_sock)
agent_query_conns.append(agent_sock)
except Exception:
# Put it back on the list
stale_hosts.append(ip)
remaining = int(hello_timeout - time.time())
if remaining <= 0 or remaining > 30:
hello_timeout = time.time() + 30.0
remaining = 30
pc.socket_lock.acquire()
hello = PatchMessageHello()
hello.send(pc.sock_out)
hello_agent = PatchMessageHelloAgent()
hello_agent.send(pc.sock_out)
pc.socket_lock.release()
# Age out neighbours
pc.controller_neighbours_lock.acquire()
nbrs = list(pc.controller_neighbours)
for n in nbrs:
# Age out controllers after 2 minutes
if pc.controller_neighbours[n].get_age() >= 120:
LOG.info("Aging out controller %s from table", n)
del pc.controller_neighbours[n]
pc.controller_neighbours_lock.release()
pc.hosts_lock.acquire()
nbrs = list(pc.hosts)
for n in nbrs:
# Age out hosts after 1 hour
if pc.hosts[n].get_age() >= 3600:
LOG.info("Aging out host %s from table", n)
del pc.hosts[n]
for patch_id in list(pc.interim_state):
if n in pc.interim_state[patch_id]:
pc.interim_state[patch_id].remove(n)
pc.hosts_lock.release()
except Exception:
# Log all exceptions
LOG.exception("Error occurred during request processing")
thread_death.set()
def main():
configure_logging()
cfg.read_config()
# daemon.pidlockfile.write_pid_to_pidfile(pidfile_path)
global thread_death
thread_death = threading.Event()
# Set the TMPDIR environment variable to /scratch so that any modules
# that create directories with tempfile will not use /tmp
os.environ['TMPDIR'] = '/scratch'
global pc
pc = PatchController()
LOG.info("launching")
api_thread = PatchControllerApiThread()
auth_api_thread = PatchControllerAuthApiThread()
main_thread = PatchControllerMainThread()
api_thread.start()
auth_api_thread.start()
main_thread.start()
thread_death.wait()
global keep_running
keep_running = False
api_thread.join()
auth_api_thread.join()
main_thread.join()