67215f30fe
This commit updates dcorch to use the newly introduced dbsync service
APIs to synchronize identity resources from central cloud to subclouds.
The following identity resources are synced:
- users (local users only)
- user passwords
- projects
- roles
- project role assignments
- token revocation events
Story: 2002842
Task: 22787
Signed-off-by: Andy Ning <andy.ning@windriver.com>
(cherry picked from commit e9096c7a23
)
Depends-On: https://review.opendev.org/#/c/655921
Depends-On: https://review.opendev.org/#/c/655773
Depends-On: https://review.opendev.org/#/c/655776
Depends-On: https://review.opendev.org/#/c/655927
Change-Id: I77c2cc712a1c3dc8a228883c3fea1423e5207dea
745 lines
33 KiB
Python
745 lines
33 KiB
Python
# Copyright 2017 Wind River
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import threading
|
|
|
|
from oslo_config import cfg
|
|
from oslo_log import log as logging
|
|
|
|
from dcdbsync.dbsyncclient import client as dbsyncclient
|
|
from dcmanager.common import consts as dcmanager_consts
|
|
from dcmanager.rpc import client as dcmanager_rpc_client
|
|
from dcorch.common import consts
|
|
from dcorch.common import context
|
|
from dcorch.common import exceptions
|
|
from dcorch.common import utils
|
|
from dcorch.objects import orchrequest
|
|
from dcorch.objects import resource
|
|
from dcorch.objects import subcloud_resource
|
|
|
|
from keystoneauth1 import loading
|
|
from keystoneauth1 import session
|
|
from keystoneclient import client as keystoneclient
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
STATUS_NEW = 'new'
|
|
STATUS_PROCESSING = 'processing'
|
|
STATUS_TIMEDOUT = 'timedout'
|
|
STATUS_SLEEPING = 'sleeping'
|
|
STATUS_SHUTTING_DOWN = 'shutting_down' # is this actually needed?
|
|
|
|
# sync request states, should be in SyncRequest class
|
|
STATE_QUEUED = 'queued'
|
|
STATE_IN_PROGRESS = 'in-progress'
|
|
STATE_TIMEDOUT = 'timedout'
|
|
STATE_ABORTED = 'aborted'
|
|
STATE_FAILED = 'failed'
|
|
STATE_COMPLETED = 'completed'
|
|
|
|
# Audit findings
|
|
AUDIT_RESOURCE_MISSING = 'missing'
|
|
AUDIT_RESOURCE_EXTRA = 'extra_resource'
|
|
|
|
|
|
class SyncThread(object):
|
|
"""Manages tasks related to resource management."""
|
|
|
|
MAX_RETRY = 2
|
|
|
|
def __init__(self, subcloud_engine):
|
|
super(SyncThread, self).__init__()
|
|
self.endpoint_type = None # endpoint type in keystone
|
|
self.subcloud_engine = subcloud_engine # engine that owns this obj
|
|
self.thread = None # thread running sync()
|
|
self.audit_thread = None
|
|
self.status = STATUS_NEW # protected by condition lock
|
|
self.audit_status = None # todo: needed?
|
|
self.condition = threading.Condition() # used to wake up the thread
|
|
self.ctxt = context.get_admin_context()
|
|
self.sync_handler_map = {}
|
|
self.master_region_name = consts.CLOUD_0
|
|
self.audit_resources = []
|
|
|
|
self.log_extra = {
|
|
"instance": self.subcloud_engine.subcloud.region_name + ": "}
|
|
self.dcmanager_rpc_client = dcmanager_rpc_client.ManagerClient()
|
|
self.sync_status = dcmanager_consts.SYNC_STATUS_UNKNOWN
|
|
self.subcloud_managed = False
|
|
|
|
self.sc_admin_session = None
|
|
self.admin_session = None
|
|
self.ks_client = None
|
|
self.dbs_client = None
|
|
|
|
def start(self):
|
|
if self.status == STATUS_NEW:
|
|
self.status = STATUS_PROCESSING
|
|
self.thread = threading.Thread(target=self.sync)
|
|
self.thread.start()
|
|
else:
|
|
LOG.error("unable to start, not in new status",
|
|
extra=self.log_extra)
|
|
|
|
def shutdown(self):
|
|
# Stop all work, optionally delete from DB
|
|
self.condition.acquire()
|
|
self.status = STATUS_SHUTTING_DOWN
|
|
self.condition.notify() # Wake the threads so they exit.
|
|
self.condition.release()
|
|
|
|
def should_exit(self):
|
|
# Return whether the sync/audit threads should exit.
|
|
# Caller must hold the condition lock.
|
|
return self.status == STATUS_SHUTTING_DOWN
|
|
|
|
def wake(self):
|
|
# Called when work has been saved to the DB
|
|
self.condition.acquire()
|
|
self.status = STATUS_PROCESSING
|
|
self.condition.notify()
|
|
self.condition.release()
|
|
|
|
def initialize(self):
|
|
# base implementation of initializing the master client.
|
|
# The specific SyncThread subclasses may extend this.
|
|
loader = loading.get_plugin_loader(
|
|
cfg.CONF.keystone_authtoken.auth_type)
|
|
auth = loader.load_from_options(
|
|
auth_url=cfg.CONF.cache.auth_uri,
|
|
username=cfg.CONF.cache.admin_username,
|
|
password=cfg.CONF.cache.admin_password,
|
|
project_name=cfg.CONF.cache.admin_tenant,
|
|
project_domain_name=cfg.CONF.cache.admin_project_domain_name,
|
|
user_domain_name=cfg.CONF.cache.admin_user_domain_name)
|
|
self.admin_session = session.Session(
|
|
auth=auth, timeout=60, additional_headers=consts.USER_HEADER)
|
|
# keystone client
|
|
self.ks_client = keystoneclient.Client(
|
|
session=self.admin_session,
|
|
region_name=consts.VIRTUAL_MASTER_CLOUD)
|
|
# dcdbsync client
|
|
self.dbs_client = dbsyncclient.Client(
|
|
endpoint_type=consts.DBS_ENDPOINT_INTERNAL,
|
|
session=self.admin_session,
|
|
region_name=consts.VIRTUAL_MASTER_CLOUD)
|
|
|
|
def initialize_sc_clients(self):
|
|
# base implementation of initializing the subcloud specific
|
|
# clients, only used by the subclasses.
|
|
# The specific SyncThread subclasses may extend this
|
|
if (not self.sc_admin_session):
|
|
# Subclouds will use token from the Subcloud specific Keystone,
|
|
# so define a session against that subcloud's identity
|
|
identity_service = self.ks_client.services.list(
|
|
name='keystone', type='identity')
|
|
sc_auth_url = self.ks_client.endpoints.list(
|
|
service=identity_service[0].id,
|
|
interface=consts.KS_ENDPOINT_INTERNAL,
|
|
region=self.subcloud_engine.subcloud.region_name)
|
|
try:
|
|
LOG.info("Found sc_auth_url: {}".format(sc_auth_url))
|
|
sc_auth_url = sc_auth_url[0].url
|
|
except IndexError:
|
|
# It may happen that this subcloud was not managed
|
|
LOG.info("Cannot find identity auth_url",
|
|
extra=self.log_extra)
|
|
return
|
|
|
|
loader = loading.get_plugin_loader(
|
|
cfg.CONF.keystone_authtoken.auth_type)
|
|
sc_auth = loader.load_from_options(
|
|
auth_url=sc_auth_url,
|
|
username=cfg.CONF.cache.admin_username,
|
|
password=cfg.CONF.cache.admin_password,
|
|
project_name=cfg.CONF.cache.admin_tenant,
|
|
project_domain_name=cfg.CONF.cache.admin_project_domain_name,
|
|
user_domain_name=cfg.CONF.cache.admin_user_domain_name)
|
|
|
|
self.sc_admin_session = session.Session(
|
|
auth=sc_auth, timeout=60,
|
|
additional_headers=consts.USER_HEADER)
|
|
|
|
def initial_sync(self):
|
|
# Return True to indicate initial sync success
|
|
return True
|
|
|
|
def enable(self):
|
|
# Called when DC manager thinks this subcloud is good to go.
|
|
self.initialize()
|
|
self.wake()
|
|
self.run_sync_audit()
|
|
|
|
def get_db_subcloud_resource(self, rsrc_id):
|
|
try:
|
|
subcloud_rsrc = \
|
|
subcloud_resource.SubcloudResource. \
|
|
get_by_resource_and_subcloud(
|
|
self.ctxt, rsrc_id, self.subcloud_engine.subcloud.id)
|
|
return subcloud_rsrc
|
|
except exceptions.SubcloudResourceNotFound:
|
|
LOG.info("{} not found in subcloud {} resource table".format(
|
|
rsrc_id, self.subcloud_engine.subcloud.id),
|
|
extra=self.log_extra)
|
|
return None
|
|
|
|
def persist_db_subcloud_resource(self, db_rsrc_id, subcloud_rsrc_id):
|
|
# This function can be invoked after creating a subcloud resource.
|
|
# Persist the subcloud resource to the DB for later
|
|
#
|
|
# Parameters:
|
|
# db_rsrc_id: the "id" field of the resource in the DB
|
|
# subcloud_rsrc_id: the unique identifier of the subcloud resource
|
|
|
|
subcloud_rsrc = self.get_db_subcloud_resource(db_rsrc_id)
|
|
if not subcloud_rsrc:
|
|
subcloud_rsrc = subcloud_resource.SubcloudResource(
|
|
self.ctxt, subcloud_resource_id=subcloud_rsrc_id,
|
|
resource_id=db_rsrc_id,
|
|
subcloud_id=self.subcloud_engine.subcloud.id)
|
|
# There is no race condition for creation of
|
|
# subcloud_resource as it is always done from the same thread.
|
|
subcloud_rsrc.create()
|
|
elif subcloud_rsrc.subcloud_resource_id != subcloud_rsrc_id:
|
|
# May be the resource was manually deleted from the subcloud.
|
|
# So, update the dcorch DB with the new resource id from subcloud.
|
|
subcloud_rsrc.subcloud_resource_id = subcloud_rsrc_id
|
|
LOG.info("Updating {}:{} [{}]".format(db_rsrc_id,
|
|
subcloud_rsrc.subcloud_resource_id, subcloud_rsrc_id),
|
|
extra=self.log_extra)
|
|
subcloud_rsrc.save()
|
|
else:
|
|
LOG.info("subcloud_rsrc {}:{} [{}] is up-to-date"
|
|
.format(db_rsrc_id, subcloud_rsrc.subcloud_resource_id,
|
|
subcloud_rsrc_id),
|
|
extra=self.log_extra)
|
|
return subcloud_rsrc.subcloud_resource_id
|
|
|
|
def sync_resource(self, sync_request):
|
|
rsrc = resource.Resource.get_by_id(self.ctxt,
|
|
sync_request.orch_job.resource_id)
|
|
handler = self.sync_handler_map[rsrc.resource_type]
|
|
LOG.info("Invoking {} for {} [{}]".format(
|
|
handler.__name__, rsrc.resource_type,
|
|
sync_request.orch_job.operation_type), extra=self.log_extra)
|
|
handler(sync_request, rsrc)
|
|
|
|
def set_sync_status(self, sync_status):
|
|
# Only report sync_status when managed
|
|
subcloud_managed = self.subcloud_engine.is_managed()
|
|
if not subcloud_managed:
|
|
LOG.debug("set_sync_status: skip update sync update for unmanaged "
|
|
"subcloud {}".format(
|
|
self.subcloud_engine.subcloud.region_name))
|
|
self.sync_status = dcmanager_consts.SYNC_STATUS_UNKNOWN
|
|
self.subcloud_managed = False
|
|
return
|
|
|
|
if ((self.sync_status == sync_status) and
|
|
(self.subcloud_managed != subcloud_managed)):
|
|
return
|
|
|
|
self.sync_status = sync_status
|
|
self.subcloud_managed = subcloud_managed
|
|
|
|
self.dcmanager_rpc_client.update_subcloud_endpoint_status(
|
|
self.ctxt, self.subcloud_engine.subcloud.region_name,
|
|
self.endpoint_type, sync_status)
|
|
|
|
def sync(self):
|
|
LOG.info("{}: starting sync routine".format(self.thread.name),
|
|
extra=self.log_extra)
|
|
self.condition.acquire()
|
|
self.status = STATUS_PROCESSING
|
|
region_name = self.subcloud_engine.subcloud.region_name
|
|
while self.status != STATUS_SHUTTING_DOWN:
|
|
sync_requests = []
|
|
# We want to check for pending work even if subcloud is disabled.
|
|
if self.status in (STATUS_PROCESSING, STATUS_TIMEDOUT):
|
|
states = [
|
|
consts.ORCH_REQUEST_QUEUED,
|
|
consts.ORCH_REQUEST_IN_PROGRESS,
|
|
]
|
|
sync_requests = orchrequest.OrchRequestList.get_by_attrs(
|
|
self.ctxt, self.endpoint_type,
|
|
target_region_name=region_name,
|
|
states=states)
|
|
LOG.info("Got " + str(len(sync_requests)) + " sync request(s)",
|
|
extra=self.log_extra)
|
|
# todo: for each request look up sync handler based on
|
|
# resource type (I'm assuming here we're not storing a python
|
|
# object in the DB)
|
|
|
|
# Update dcmanager with the current sync status.
|
|
subcloud_enabled = self.subcloud_engine.is_enabled()
|
|
if sync_requests:
|
|
self.set_sync_status(dcmanager_consts.SYNC_STATUS_OUT_OF_SYNC)
|
|
else:
|
|
self.set_sync_status(dcmanager_consts.SYNC_STATUS_IN_SYNC)
|
|
|
|
if (not sync_requests or not subcloud_enabled or
|
|
self.status == STATUS_TIMEDOUT):
|
|
# Either there are no sync requests, or subcloud is disabled,
|
|
# or we timed out trying to talk to it.
|
|
# We're not going to process any sync requests, just go
|
|
# back to sleep.
|
|
if not subcloud_enabled:
|
|
LOG.info("subcloud is disabled", extra=self.log_extra)
|
|
if self.status == STATUS_PROCESSING:
|
|
self.status = STATUS_SLEEPING
|
|
LOG.debug("calling condition.wait", extra=self.log_extra)
|
|
# no work to do, sleep till someone wakes us
|
|
self.condition.wait()
|
|
LOG.debug("back from condition.wait", extra=self.log_extra)
|
|
else:
|
|
# Subcloud is enabled and there are pending sync requests, so
|
|
# we have work to do.
|
|
self.condition.release()
|
|
try:
|
|
for request in sync_requests:
|
|
if not self.subcloud_engine.is_enabled() or \
|
|
self.should_exit():
|
|
# Oops, someone disabled the endpoint while
|
|
# we were processing work for it.
|
|
raise exceptions.EndpointNotReachable()
|
|
request.state = consts.ORCH_REQUEST_STATE_IN_PROGRESS
|
|
request.save() # save to DB
|
|
retry_count = 0
|
|
while retry_count < self.MAX_RETRY:
|
|
try:
|
|
self.sync_resource(request)
|
|
request.state = \
|
|
consts.ORCH_REQUEST_STATE_COMPLETED
|
|
request.save() # save to DB
|
|
break
|
|
except exceptions.SyncRequestTimeout:
|
|
request.try_count += 1
|
|
request.save()
|
|
retry_count += 1
|
|
if retry_count >= self.MAX_RETRY:
|
|
# todo: raise "unable to sync this
|
|
# subcloud/endpoint" alarm with fmapi
|
|
self.condition.acquire()
|
|
self.status = STATUS_TIMEDOUT
|
|
self.condition.release()
|
|
raise exceptions.EndpointNotReachable()
|
|
except exceptions.SyncRequestFailedRetry:
|
|
# todo: raise "unable to sync this
|
|
# subcloud/endpoint" alarm with fmapi
|
|
request.try_count += 1
|
|
request.state = \
|
|
consts.ORCH_REQUEST_STATE_FAILED
|
|
request.save()
|
|
retry_count += 1
|
|
# we'll retry
|
|
except exceptions.SyncRequestFailed:
|
|
request.state = \
|
|
consts.ORCH_REQUEST_STATE_FAILED
|
|
request.save()
|
|
retry_count = self.MAX_RETRY
|
|
|
|
# If we fall out of the retry loop we either succeeded
|
|
# or failed multiple times and want to move to the next
|
|
# request.
|
|
|
|
except exceptions.EndpointNotReachable:
|
|
# Endpoint not reachable, throw away all the sync requests.
|
|
LOG.info("EndpointNotReachable, {} sync requests pending"
|
|
.format(len(sync_requests)))
|
|
# del sync_requests[:] #This fails due to:
|
|
# 'OrchRequestList' object does not support item deletion
|
|
self.condition.acquire()
|
|
# if we get here it's because we want this thread to exit
|
|
self.condition.release()
|
|
LOG.info("exiting thread for subcloud", extra=self.log_extra)
|
|
|
|
def run_sync_audit(self):
|
|
if not self.subcloud_engine.is_enabled() or self.should_exit():
|
|
return
|
|
if self.endpoint_type in cfg.CONF.disable_audit_endpoints:
|
|
LOG.warn("Audit disabled!", extra=self.log_extra)
|
|
return
|
|
# This will be called periodically as well as when the subcloud is
|
|
# enabled. We want to make a new thread to do this so the caller
|
|
# doesn't get blocked.
|
|
thread = threading.Thread(target=self.do_sync_audit)
|
|
thread.start()
|
|
LOG.debug("{}: do_sync_audit started".format(thread.name),
|
|
extra=self.log_extra)
|
|
|
|
def do_sync_audit(self):
|
|
LOG.debug("In do sync audit", extra=self.log_extra)
|
|
# This first part just checks to see if we want to wake up the main
|
|
# sync thread. We want to run this unconditionally.
|
|
self.condition.acquire()
|
|
if self.status == STATUS_TIMEDOUT:
|
|
self.status = STATUS_PROCESSING
|
|
self.condition.notify()
|
|
|
|
# Now we want to look at the actual sync audit. If there's already a
|
|
# sync audit thread running don't make a new one.
|
|
if self.audit_thread is None or not self.audit_thread.is_alive():
|
|
LOG.debug("Creating sync audit thread", extra=self.log_extra)
|
|
self.audit_thread = threading.Thread(target=self.sync_audit)
|
|
self.audit_thread.start()
|
|
else:
|
|
LOG.info("Skipping sync audit thread creation, already running",
|
|
extra=self.log_extra)
|
|
self.condition.release()
|
|
|
|
def sync_audit(self):
|
|
LOG.debug("{}: starting sync audit".format(self.audit_thread.name),
|
|
extra=self.log_extra)
|
|
|
|
total_num_of_audit_jobs = 0
|
|
for resource_type in self.audit_resources:
|
|
if not self.subcloud_engine.is_enabled() or self.should_exit():
|
|
LOG.info("{}: aborting sync audit, as subcloud is disabled"
|
|
.format(self.audit_thread.name),
|
|
extra=self.log_extra)
|
|
return
|
|
|
|
# Skip resources with outstanding sync requests
|
|
region_name = self.subcloud_engine.subcloud.region_name
|
|
sync_requests = []
|
|
states = [
|
|
consts.ORCH_REQUEST_QUEUED,
|
|
consts.ORCH_REQUEST_IN_PROGRESS,
|
|
]
|
|
sync_requests = orchrequest.OrchRequestList.get_by_attrs(
|
|
self.ctxt, self.endpoint_type, resource_type=resource_type,
|
|
target_region_name=region_name, states=states)
|
|
abort_resources = [req.orch_job.source_resource_id
|
|
for req in sync_requests]
|
|
if len(sync_requests) > 0:
|
|
LOG.info("Will not audit {}. {} sync request(s) pending"
|
|
.format(abort_resources, len(sync_requests)),
|
|
extra=self.log_extra)
|
|
|
|
num_of_audit_jobs = 0
|
|
try:
|
|
m_resources, db_resources, sc_resources = \
|
|
self.get_all_resources(resource_type)
|
|
|
|
# todo: delete entries in db_resources with no corresponding
|
|
# entry in m_resources?
|
|
|
|
if sc_resources is None or m_resources is None:
|
|
return
|
|
LOG.info("Audit {}: {} vs {}".format(
|
|
resource_type, m_resources, sc_resources),
|
|
extra=self.log_extra)
|
|
LOG.debug("Auditing {}: master={} db={} sc={}".format(
|
|
resource_type, m_resources, db_resources, sc_resources),
|
|
extra=self.log_extra)
|
|
num_of_audit_jobs += self.audit_find_missing(
|
|
resource_type, m_resources, db_resources, sc_resources,
|
|
abort_resources)
|
|
num_of_audit_jobs += self.audit_find_extra(
|
|
resource_type, m_resources, db_resources, sc_resources,
|
|
abort_resources)
|
|
except Exception as e:
|
|
LOG.exception(e)
|
|
|
|
# Extra resources in subcloud are not impacted by the audit.
|
|
|
|
if not num_of_audit_jobs:
|
|
LOG.info("Clean audit run for {}".format(resource_type),
|
|
extra=self.log_extra)
|
|
total_num_of_audit_jobs += num_of_audit_jobs
|
|
|
|
if not total_num_of_audit_jobs:
|
|
# todo: if we had an "unable to sync this
|
|
# subcloud/endpoint" alarm raised, then clear it
|
|
pass
|
|
|
|
LOG.debug("{}: done sync audit".format(self.audit_thread.name),
|
|
extra=self.log_extra)
|
|
|
|
def audit_find_missing(self, resource_type, m_resources,
|
|
db_resources, sc_resources,
|
|
abort_resources):
|
|
"""Find missing resources in subcloud.
|
|
|
|
- Input param db_resources is modified in this routine
|
|
to remove entries that match the resources in
|
|
master cloud. At the end, db_resources will have a
|
|
list of resources that are present in dcorch DB, but
|
|
not present in the master cloud.
|
|
"""
|
|
num_of_audit_jobs = 0
|
|
for m_r in m_resources:
|
|
master_id = self.get_resource_id(resource_type, m_r)
|
|
if master_id in abort_resources:
|
|
LOG.info("audit_find_missing: Aborting audit for {}"
|
|
.format(master_id), extra=self.log_extra)
|
|
num_of_audit_jobs += 1
|
|
# There are pending jobs for this resource, abort audit
|
|
continue
|
|
|
|
missing_resource = False
|
|
m_rsrc_db = None
|
|
for db_resource in db_resources:
|
|
if db_resource.master_id == master_id:
|
|
m_rsrc_db = db_resource
|
|
db_resources.remove(db_resource)
|
|
break
|
|
|
|
if m_rsrc_db:
|
|
# resource from master cloud is present in DB.
|
|
|
|
# Contents of "m_r" may refer to other master cloud resources.
|
|
# Make a copy with the references updated to refer to subcloud
|
|
# resources.
|
|
try:
|
|
m_r_updated = self.update_resource_refs(resource_type, m_r)
|
|
except exceptions.SubcloudResourceNotFound:
|
|
# If we couldn't find the equivalent subcloud resources,
|
|
# we don't know what to look for in the subcloud so skip
|
|
# this m_r and go to the next one.
|
|
continue
|
|
|
|
# Now, look for subcloud resource in DB.
|
|
# If present: look for actual resource in the
|
|
# subcloud and compare the resource details.
|
|
# If not present: create resource in subcloud.
|
|
db_sc_resource = self.get_db_subcloud_resource(m_rsrc_db.id)
|
|
if db_sc_resource:
|
|
if not db_sc_resource.is_managed():
|
|
LOG.info("Resource {} is not managed"
|
|
.format(master_id), extra=self.log_extra)
|
|
continue
|
|
sc_rsrc_present = False
|
|
for sc_r in sc_resources:
|
|
sc_id = self.get_resource_id(resource_type, sc_r)
|
|
if sc_id == db_sc_resource.subcloud_resource_id:
|
|
if self.same_resource(resource_type,
|
|
m_r_updated, sc_r):
|
|
LOG.info("Resource type {} {} is in-sync"
|
|
.format(resource_type, master_id),
|
|
extra=self.log_extra)
|
|
num_of_audit_jobs += self.audit_dependants(
|
|
resource_type, m_r, sc_r)
|
|
sc_rsrc_present = True
|
|
break
|
|
if not sc_rsrc_present:
|
|
LOG.info(
|
|
"Subcloud resource {} found in master cloud & DB, "
|
|
"but the exact same resource not found in subcloud"
|
|
.format(db_sc_resource.subcloud_resource_id),
|
|
extra=self.log_extra)
|
|
# Subcloud resource is present in DB, but the check
|
|
# for same_resource() was negative. Either the resource
|
|
|
|
# disappeared from subcloud or the resource details
|
|
# are different from that of master cloud. Let the
|
|
# resource implementation decide on the audit action.
|
|
missing_resource = self.audit_discrepancy(
|
|
resource_type, m_r, sc_resources)
|
|
else:
|
|
LOG.info("Subcloud res {} not found in DB, will create"
|
|
.format(master_id), extra=self.log_extra)
|
|
# Check and see if there are any subcloud resources that
|
|
# match the master resource, and if so set up mappings.
|
|
# This returns true if it finds a match.
|
|
if self.map_subcloud_resource(resource_type, m_r_updated,
|
|
m_rsrc_db, sc_resources):
|
|
continue
|
|
missing_resource = True
|
|
|
|
else: # master_resource not in resource DB
|
|
LOG.info("{} not found in DB, will create it"
|
|
.format(master_id),
|
|
extra=self.log_extra)
|
|
missing_resource = True
|
|
|
|
if missing_resource:
|
|
# Resource is missing from subcloud, take action
|
|
num_of_audit_jobs += self.audit_action(
|
|
resource_type, AUDIT_RESOURCE_MISSING, m_r)
|
|
|
|
# As the subcloud resource is missing, invoke
|
|
# the hook for dependants with no subcloud resource.
|
|
# Resource implementation should handle this.
|
|
num_of_audit_jobs += self.audit_dependants(
|
|
resource_type, m_r, None)
|
|
return num_of_audit_jobs
|
|
|
|
def audit_find_extra(self, resource_type, m_resources,
|
|
db_resources, sc_resources, abort_resources):
|
|
"""Find extra resources in subcloud.
|
|
|
|
- Input param db_resources is expected to be a
|
|
list of resources that are present in dcorch DB, but
|
|
not present in the master cloud.
|
|
"""
|
|
|
|
num_of_audit_jobs = 0
|
|
# At this point, db_resources contains resources present in DB,
|
|
# but not in master cloud
|
|
for db_resource in db_resources:
|
|
if db_resource.master_id:
|
|
if db_resource.master_id in abort_resources:
|
|
LOG.info("audit_find_extra: Aborting audit for {}"
|
|
.format(db_resource.master_id),
|
|
extra=self.log_extra)
|
|
num_of_audit_jobs += 1
|
|
# There are pending jobs for this resource, abort audit
|
|
continue
|
|
|
|
LOG.debug("Extra resource ({}) in DB".format(db_resource.id),
|
|
extra=self.log_extra)
|
|
subcloud_rsrc = self.get_db_subcloud_resource(db_resource.id)
|
|
if subcloud_rsrc:
|
|
if not subcloud_rsrc.is_managed():
|
|
LOG.info("Resource {} is not managed"
|
|
.format(subcloud_rsrc.subcloud_resource_id),
|
|
extra=self.log_extra)
|
|
continue
|
|
LOG.info("Resource ({}) and subcloud resource ({}) "
|
|
"not in sync with master cloud"
|
|
.format(db_resource.master_id,
|
|
subcloud_rsrc.subcloud_resource_id),
|
|
extra=self.log_extra)
|
|
# There is extra resource in the subcloud, take action.
|
|
# Note that the resource is in dcorch DB, but not
|
|
# actually present in the master cloud.
|
|
num_of_audit_jobs += self.audit_action(
|
|
resource_type, AUDIT_RESOURCE_EXTRA, db_resource)
|
|
else:
|
|
# Resource is present in resource table, but not in
|
|
# subcloud_resource table. We have also established that
|
|
# the corresponding OpenStack resource is not present in
|
|
# the master cloud.
|
|
# There might be another subcloud with "unmanaged"
|
|
# subcloud resource corresponding to this resource.
|
|
# So, just ignore this here!
|
|
pass
|
|
return num_of_audit_jobs
|
|
|
|
def schedule_work(self, endpoint_type, resource_type,
|
|
source_resource_id, operation_type,
|
|
resource_info=None):
|
|
LOG.info("Scheduling {} work for {}/{}".format(
|
|
operation_type, resource_type, source_resource_id),
|
|
extra=self.log_extra)
|
|
try:
|
|
utils.enqueue_work(
|
|
self.ctxt, endpoint_type, resource_type,
|
|
source_resource_id, operation_type, resource_info,
|
|
subcloud=self.subcloud_engine.subcloud)
|
|
self.wake()
|
|
except Exception as e:
|
|
LOG.info("Exception in schedule_work: {}".format(str(e)),
|
|
extra=self.log_extra)
|
|
|
|
def get_resource_id(self, resource_type, resource):
|
|
if hasattr(resource, 'master_id'):
|
|
# If resource from DB, return master resource id
|
|
# from master cloud
|
|
return resource.master_id
|
|
else:
|
|
# Else, return id field (by default)
|
|
return resource.id
|
|
|
|
# Audit functions to be overridden in inherited classes
|
|
def get_all_resources(self, resource_type):
|
|
m_resources = None
|
|
db_resources = None
|
|
# Query subcloud first. If not reachable, abort audit.
|
|
sc_resources = self.get_subcloud_resources(resource_type)
|
|
if sc_resources is None:
|
|
return m_resources, db_resources, sc_resources
|
|
db_resources = self.get_db_master_resources(resource_type)
|
|
# todo: master resources will be read by multiple threads
|
|
# depending on the number of subclouds. Could do some kind of
|
|
# caching for performance improvement.
|
|
m_resources = self.get_master_resources(resource_type)
|
|
return m_resources, db_resources, sc_resources
|
|
|
|
def get_subcloud_resources(self, resource_type):
|
|
return None
|
|
|
|
def get_db_master_resources(self, resource_type):
|
|
return list(resource.ResourceList.get_all(self.ctxt, resource_type))
|
|
|
|
def get_master_resources(self, resource_type):
|
|
return None
|
|
|
|
def same_resource(self, resource_type, m_resource, sc_resource):
|
|
return True
|
|
|
|
def has_same_ids(self, resource_type, m_resource, sc_resource):
|
|
return False
|
|
|
|
def map_subcloud_resource(self, resource_type, m_r, m_rsrc_db,
|
|
sc_resources):
|
|
# Child classes can override this function to map an existing subcloud
|
|
# resource to an existing master resource. If a mapping is created
|
|
# the function should return True.
|
|
#
|
|
# It is expected that update_resource_refs() has been called on m_r.
|
|
return False
|
|
|
|
def update_resource_refs(self, resource_type, m_r):
|
|
# Child classes can override this function to update any references
|
|
# to other master resources embedded within the info of this resource.
|
|
return m_r
|
|
|
|
def audit_dependants(self, resource_type, m_resource, sc_resource):
|
|
num_of_audit_jobs = 0
|
|
if not self.subcloud_engine.is_enabled() or self.should_exit():
|
|
return num_of_audit_jobs
|
|
if not sc_resource:
|
|
# Handle None value for sc_resource
|
|
pass
|
|
return num_of_audit_jobs
|
|
|
|
def audit_discrepancy(self, resource_type, m_resource, sc_resources):
|
|
# Return true to try creating the resource again
|
|
return True
|
|
|
|
def audit_action(self, resource_type, finding, resource):
|
|
LOG.info("audit_action: {}/{}"
|
|
.format(finding, resource_type),
|
|
extra=self.log_extra)
|
|
# Default actions are create & delete. Can be overridden
|
|
# in resource implementation
|
|
num_of_audit_jobs = 0
|
|
# resource can be either from dcorch DB or fetched by OpenStack query
|
|
resource_id = self.get_resource_id(resource_type, resource)
|
|
if finding == AUDIT_RESOURCE_MISSING:
|
|
# default action is create for a 'missing' resource
|
|
self.schedule_work(
|
|
self.endpoint_type, resource_type,
|
|
resource_id,
|
|
consts.OPERATION_TYPE_CREATE,
|
|
self.get_resource_info(
|
|
resource_type, resource,
|
|
consts.OPERATION_TYPE_CREATE))
|
|
num_of_audit_jobs += 1
|
|
elif finding == AUDIT_RESOURCE_EXTRA:
|
|
# default action is delete for an 'extra_resource'
|
|
# resource passed in is db_resource (resource in dcorch DB)
|
|
self.schedule_work(self.endpoint_type, resource_type,
|
|
resource_id,
|
|
consts.OPERATION_TYPE_DELETE)
|
|
num_of_audit_jobs += 1
|
|
return num_of_audit_jobs
|
|
|
|
def get_resource_info(self, resource_type, resource, operation_type=None):
|
|
return ""
|