Keystone DB sync - update dcorch to use dcdbsync
This commit updates dcorch to use the newly introduced dbsync service APIs to synchronize identity resources from central cloud to subclouds. The following identity resources are synced: - users (local users only) - user passwords - projects - roles - project role assignments - token revocation events Story: 2002842 Task: 22787 Depends-On: https://review.openstack.org/#/c/641498 Change-Id: I0840f42f6c7f380d74950fea165688a4b19a2695 Signed-off-by: Andy Ning <andy.ning@windriver.com>
This commit is contained in:
parent
f856d9f79a
commit
e9096c7a23
@ -356,7 +356,9 @@ class SubcloudsController(object):
|
||||
('heat_admin', 'heat-domain'),
|
||||
('gnocchi', 'gnocchi'),
|
||||
('fm', 'fm'),
|
||||
('barbican', 'barbican')
|
||||
('barbican', 'barbican'),
|
||||
('smapi', 'smapi'),
|
||||
('dcdbsync', 'dcdbsync')
|
||||
]
|
||||
|
||||
user_list = list()
|
||||
|
@ -310,6 +310,7 @@ class SubcloudManager(manager.Manager):
|
||||
|
||||
# Get the subcloud details from the database
|
||||
subcloud = db_api.subcloud_get(context, subcloud_id)
|
||||
original_management_state = subcloud.management_state
|
||||
|
||||
# Semantic checking
|
||||
if management_state:
|
||||
@ -357,7 +358,14 @@ class SubcloudManager(manager.Manager):
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
LOG.warn('Problem informing dcorch of subcloud '
|
||||
'state change, subcloud: %s' % subcloud.name)
|
||||
'state change, resume to original state, subcloud: %s'
|
||||
% subcloud.name)
|
||||
management_state = original_management_state
|
||||
subcloud = \
|
||||
db_api.subcloud_update(context, subcloud_id,
|
||||
management_state=management_state,
|
||||
description=description,
|
||||
location=location)
|
||||
|
||||
if management_state == consts.MANAGEMENT_UNMANAGED:
|
||||
|
||||
|
@ -343,6 +343,7 @@ class ComputeAPIController(APIController):
|
||||
resource_tag = self._get_resource_tag_from_header(request_header,
|
||||
operation_type,
|
||||
resource_type)
|
||||
|
||||
handler = self._resource_handler[resource_tag]
|
||||
operation_type, resource_id, resource_info = handler(
|
||||
environ=environ,
|
||||
@ -454,7 +455,7 @@ class IdentityAPIController(APIController):
|
||||
return response
|
||||
|
||||
def _generate_assignment_rid(self, url, environ):
|
||||
resource_id = ''
|
||||
resource_id = None
|
||||
# for role assignment or revocation, the URL is of format:
|
||||
# /v3/projects/{project_id}/users/{user_id}/roles/{role_id}
|
||||
# We need to extract all ID parameters from the URL
|
||||
@ -468,6 +469,23 @@ class IdentityAPIController(APIController):
|
||||
resource_id = "{}_{}_{}".format(proj_id, user_id, role_id)
|
||||
return resource_id
|
||||
|
||||
def _retrieve_token_revoke_event_rid(self, url, environ):
|
||||
resource_id = None
|
||||
# for token revocation event, we need to retrieve the audit_id
|
||||
# from the token being revoked.
|
||||
revoked_token = environ.get('HTTP_X_SUBJECT_TOKEN', None)
|
||||
|
||||
if not revoked_token:
|
||||
LOG.error("Malformed Token Revocation URL: %s", url)
|
||||
else:
|
||||
try:
|
||||
resource_id = proxy_utils.\
|
||||
retrieve_token_audit_id(revoked_token)
|
||||
except Exception as e:
|
||||
LOG.error("Failed to retrieve token audit id: %s" % e)
|
||||
|
||||
return resource_id
|
||||
|
||||
def _enqueue_work(self, environ, request_body, response):
|
||||
LOG.info("enqueue_work")
|
||||
resource_info = {}
|
||||
@ -482,6 +500,26 @@ class IdentityAPIController(APIController):
|
||||
consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS):
|
||||
resource_id = self._generate_assignment_rid(request_header,
|
||||
environ)
|
||||
# grant a role to a user (PUT) creates a project role assignment
|
||||
if operation_type == consts.OPERATION_TYPE_PUT:
|
||||
operation_type = consts.OPERATION_TYPE_POST
|
||||
elif (resource_type ==
|
||||
consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS):
|
||||
resource_id = self._retrieve_token_revoke_event_rid(request_header,
|
||||
environ)
|
||||
# delete (revoke) a token (DELETE) creates a token revoke event.
|
||||
if operation_type == consts.OPERATION_TYPE_DELETE and resource_id:
|
||||
operation_type = consts.OPERATION_TYPE_POST
|
||||
resource_info = {'token_revoke_event':
|
||||
{'audit_id': resource_id}}
|
||||
elif (resource_type ==
|
||||
consts.RESOURCE_TYPE_IDENTITY_USERS_PASSWORD):
|
||||
resource_id = self.get_resource_id_from_link(request_header.
|
||||
strip('/password'))
|
||||
# user change password (POST) is an update to the user
|
||||
if operation_type == consts.OPERATION_TYPE_POST:
|
||||
operation_type = consts.OPERATION_TYPE_PATCH
|
||||
resource_type = consts.RESOURCE_TYPE_IDENTITY_USERS
|
||||
else:
|
||||
if operation_type == consts.OPERATION_TYPE_POST:
|
||||
# Retrieve the ID from the response
|
||||
@ -490,20 +528,25 @@ class IdentityAPIController(APIController):
|
||||
else:
|
||||
resource_id = self.get_resource_id_from_link(request_header)
|
||||
|
||||
if (operation_type != consts.OPERATION_TYPE_DELETE and request_body):
|
||||
if (operation_type != consts.OPERATION_TYPE_DELETE and
|
||||
request_body and (not resource_info)):
|
||||
resource_info = json.loads(request_body)
|
||||
|
||||
LOG.info("%s: Resource id: (%s), type: (%s), info: (%s)",
|
||||
operation_type, resource_id, resource_type, resource_info)
|
||||
try:
|
||||
utils.enqueue_work(self.ctxt,
|
||||
self.ENDPOINT_TYPE,
|
||||
resource_type,
|
||||
resource_id,
|
||||
operation_type,
|
||||
json.dumps(resource_info))
|
||||
except exception.ResourceNotFound as e:
|
||||
raise webob.exc.HTTPNotFound(explanation=e.format_message())
|
||||
|
||||
if resource_id:
|
||||
try:
|
||||
utils.enqueue_work(self.ctxt,
|
||||
self.ENDPOINT_TYPE,
|
||||
resource_type,
|
||||
resource_id,
|
||||
operation_type,
|
||||
json.dumps(resource_info))
|
||||
except exception.ResourceNotFound as e:
|
||||
raise webob.exc.HTTPNotFound(explanation=e.format_message())
|
||||
else:
|
||||
LOG.warning("Empty resource id for resource: %s", operation_type)
|
||||
|
||||
|
||||
class CinderAPIController(APIController):
|
||||
|
@ -297,7 +297,10 @@ IDENTITY_PROJECTS_PATH = [
|
||||
|
||||
IDENTITY_PROJECTS_ROLE_PATH = [
|
||||
'/v3/projects/{project_id}/users/{user_id}/roles/{role_id}',
|
||||
]
|
||||
|
||||
IDENTITY_TOKEN_REVOKE_EVENTS_PATH = [
|
||||
'/v3/auth/tokens',
|
||||
]
|
||||
|
||||
IDENTITY_PATH_MAP = {
|
||||
@ -306,7 +309,9 @@ IDENTITY_PATH_MAP = {
|
||||
consts.RESOURCE_TYPE_IDENTITY_ROLES: IDENTITY_ROLES_PATH,
|
||||
consts.RESOURCE_TYPE_IDENTITY_PROJECTS: IDENTITY_PROJECTS_PATH,
|
||||
consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS:
|
||||
IDENTITY_PROJECTS_ROLE_PATH
|
||||
IDENTITY_PROJECTS_ROLE_PATH,
|
||||
consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS:
|
||||
IDENTITY_TOKEN_REVOKE_EVENTS_PATH,
|
||||
}
|
||||
|
||||
ROUTE_METHOD_MAP = {
|
||||
@ -362,7 +367,9 @@ ROUTE_METHOD_MAP = {
|
||||
consts.RESOURCE_TYPE_IDENTITY_PROJECTS:
|
||||
['POST', 'PATCH', 'DELETE'],
|
||||
consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS:
|
||||
['PUT', 'DELETE']
|
||||
['PUT', 'DELETE'],
|
||||
consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS:
|
||||
['DELETE']
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -13,10 +13,18 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from dcorch.common import consts
|
||||
from oslo_log import log as logging
|
||||
import base64
|
||||
from cryptography import fernet
|
||||
import msgpack
|
||||
import six
|
||||
from six.moves.urllib.parse import urlparse
|
||||
|
||||
from keystoneauth1 import exceptions as keystone_exceptions
|
||||
from oslo_log import log as logging
|
||||
|
||||
from dcorch.common import consts
|
||||
from dcorch.drivers.openstack import sdk_platform as sdk
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -108,3 +116,84 @@ def set_request_forward_environ(req, remote_host, remote_port):
|
||||
if ('REMOTE_ADDR' in req.environ and 'HTTP_X_FORWARDED_FOR' not in
|
||||
req.environ):
|
||||
req.environ['HTTP_X_FORWARDED_FOR'] = req.environ['REMOTE_ADDR']
|
||||
|
||||
|
||||
def _get_fernet_keys():
|
||||
"""Get fernet keys from sysinv."""
|
||||
os_client = sdk.OpenStackDriver(consts.CLOUD_0)
|
||||
try:
|
||||
key_list = os_client.sysinv_client.get_fernet_keys()
|
||||
return [str(getattr(key, 'key')) for key in key_list]
|
||||
except (keystone_exceptions.connection.ConnectTimeout,
|
||||
keystone_exceptions.ConnectFailure) as e:
|
||||
LOG.info("get_fernet_keys: cloud {} is not reachable [{}]"
|
||||
.format(consts.CLOUD_0, str(e)))
|
||||
os_client.delete_region_clients(consts.CLOUD_0)
|
||||
return None
|
||||
except (AttributeError, TypeError) as e:
|
||||
LOG.info("get_fernet_keys error {}".format(e))
|
||||
os_client.delete_region_clients(consts.CLOUD_0, clear_token=True)
|
||||
return None
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
return None
|
||||
|
||||
|
||||
def _restore_padding(token):
|
||||
"""Restore padding based on token size.
|
||||
|
||||
:param token: token to restore padding on
|
||||
:returns: token with correct padding
|
||||
"""
|
||||
|
||||
# Re-inflate the padding
|
||||
mod_returned = len(token) % 4
|
||||
if mod_returned:
|
||||
missing_padding = 4 - mod_returned
|
||||
token += b'=' * missing_padding
|
||||
return token
|
||||
|
||||
|
||||
def _unpack_token(fernet_token, fernet_keys):
|
||||
"""Attempt to unpack a token using the supplied Fernet keys.
|
||||
|
||||
:param fernet_token: token to unpack
|
||||
:type fernet_token: string
|
||||
:param fernet_keys: a list consisting of keys in the repository
|
||||
:type fernet_keys: list
|
||||
:returns: the token payload
|
||||
"""
|
||||
|
||||
# create a list of fernet instances
|
||||
fernet_instances = [fernet.Fernet(key) for key in fernet_keys]
|
||||
# create a encryption/decryption object from the fernet keys
|
||||
crypt = fernet.MultiFernet(fernet_instances)
|
||||
|
||||
# attempt to decode the token
|
||||
token = _restore_padding(six.binary_type(fernet_token))
|
||||
serialized_payload = crypt.decrypt(token)
|
||||
payload = msgpack.unpackb(serialized_payload)
|
||||
|
||||
# present token values
|
||||
return payload
|
||||
|
||||
|
||||
def retrieve_token_audit_id(fernet_token):
|
||||
"""Attempt to retrieve the audit id from the fernet token.
|
||||
|
||||
:param fernet_token:
|
||||
:param keys_repository:
|
||||
:return: audit id in base64 encoded (without paddings)
|
||||
"""
|
||||
|
||||
audit_id = None
|
||||
fernet_keys = _get_fernet_keys()
|
||||
LOG.info("fernet_keys: {}".format(fernet_keys))
|
||||
|
||||
if fernet_keys:
|
||||
unpacked_token = _unpack_token(fernet_token, fernet_keys)
|
||||
if unpacked_token:
|
||||
audit_id = unpacked_token[-1][0]
|
||||
audit_id = base64.urlsafe_b64encode(audit_id).rstrip('=')
|
||||
|
||||
return audit_id
|
||||
|
@ -122,6 +122,8 @@ RESOURCE_TYPE_IDENTITY_USERS_PASSWORD = "users_password"
|
||||
RESOURCE_TYPE_IDENTITY_ROLES = "roles"
|
||||
RESOURCE_TYPE_IDENTITY_PROJECTS = "projects"
|
||||
RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS = "project_role_assignments"
|
||||
RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS = "revoke_events"
|
||||
RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER = "revoke_events_for_user"
|
||||
|
||||
KEYPAIR_ID_DELIM = "/"
|
||||
|
||||
@ -151,6 +153,10 @@ ENDPOINT_QUOTA_MAPPING = {
|
||||
KS_ENDPOINT_INTERNAL = "internal"
|
||||
KS_ENDPOINT_DEFAULT = KS_ENDPOINT_INTERNAL
|
||||
|
||||
# DB sync agent endpoint
|
||||
DBS_ENDPOINT_INTERNAL = "internal"
|
||||
DBS_ENDPOINT_DEFAULT = DBS_ENDPOINT_INTERNAL
|
||||
|
||||
# Do we need separate patch/put operations or could we just use
|
||||
# create/update/delete and have the sync code know which HTTP
|
||||
# operation to use?
|
||||
|
@ -170,6 +170,10 @@ class SubcloudNotFound(NotFound):
|
||||
message = _("Subcloud %(region_name)s not found")
|
||||
|
||||
|
||||
class ThreadNotFound(NotFound):
|
||||
message = _("Thread %(thread_name)s of %(region_name)s not found")
|
||||
|
||||
|
||||
class OrchJobNotFound(NotFound):
|
||||
message = _("OrchJob %(orch_job)s not found")
|
||||
|
||||
|
@ -74,6 +74,10 @@ class SysinvClient(base.DriverBase):
|
||||
token = session.get_token()
|
||||
client = cgts_client.Client(
|
||||
api_version,
|
||||
username=session.auth._username,
|
||||
password=session.auth._password,
|
||||
tenant_name=session.auth._project_name,
|
||||
auth_url=session.auth.auth_url,
|
||||
endpoint=endpoint,
|
||||
token=token)
|
||||
except exceptions.ServiceUnavailable:
|
||||
@ -710,7 +714,7 @@ class SysinvClient(base.DriverBase):
|
||||
|
||||
return iuser
|
||||
|
||||
def create_fernet_repo(self, key_list):
|
||||
def post_fernet_repo(self, key_list=None):
|
||||
"""Add the fernet keys for this region
|
||||
|
||||
:param: key list payload
|
||||
@ -721,26 +725,26 @@ class SysinvClient(base.DriverBase):
|
||||
# [{"id": 0, "key": "GgDAOfmyr19u0hXdm5r_zMgaMLjglVFpp5qn_N4GBJQ="},
|
||||
# {"id": 1, "key": "7WfL_z54p67gWAkOmQhLA9P0ZygsbbJcKgff0uh28O8="},
|
||||
# {"id": 2, "key": ""5gsUQeOZ2FzZP58DN32u8pRKRgAludrjmrZFJSOHOw0="}]
|
||||
LOG.info("create_fernet_repo driver region={} "
|
||||
LOG.info("post_fernet_repo driver region={} "
|
||||
"fernet_repo_list={}".format(self.region_name, key_list))
|
||||
try:
|
||||
self.client.fernet.create(key_list)
|
||||
except Exception as e:
|
||||
LOG.error("create_fernet_repo exception={}".format(e))
|
||||
LOG.error("post_fernet_repo exception={}".format(e))
|
||||
raise exceptions.SyncRequestFailedRetry()
|
||||
|
||||
def update_fernet_repo(self, key_list):
|
||||
def put_fernet_repo(self, key_list):
|
||||
"""Update the fernet keys for this region
|
||||
|
||||
:param: key list payload
|
||||
:return: Nothing
|
||||
"""
|
||||
LOG.info("update_fernet_repo driver region={} "
|
||||
LOG.info("put_fernet_repo driver region={} "
|
||||
"fernet_repo_list={}".format(self.region_name, key_list))
|
||||
try:
|
||||
self.client.fernet.put(key_list)
|
||||
except Exception as e:
|
||||
LOG.error("update_fernet_repo exception={}".format(e))
|
||||
LOG.error("put_fernet_repo exception={}".format(e))
|
||||
raise exceptions.SyncRequestFailedRetry()
|
||||
|
||||
def get_fernet_keys(self):
|
||||
|
@ -27,7 +27,6 @@ from dcorch.common.i18n import _
|
||||
from dcorch.common import manager
|
||||
from dcorch.common import utils
|
||||
from dcorch.drivers.openstack import sdk_platform as sdk
|
||||
from dcorch.objects import subcloud as subcloud_obj
|
||||
|
||||
|
||||
FERNET_REPO_MASTER_ID = "keys"
|
||||
@ -117,9 +116,26 @@ class FernetKeyManager(manager.Manager):
|
||||
self._schedule_work(consts.OPERATION_TYPE_PUT)
|
||||
|
||||
def distribute_keys(self, ctxt, subcloud_name):
|
||||
subclouds = subcloud_obj.SubcloudList.get_all(ctxt)
|
||||
for sc in subclouds:
|
||||
if sc.region_name == subcloud_name:
|
||||
subcloud = sc
|
||||
self._schedule_work(consts.OPERATION_TYPE_CREATE, subcloud)
|
||||
break
|
||||
keys = self._get_master_keys()
|
||||
if not keys:
|
||||
LOG.info(_("No fernet keys returned from %s") % consts.CLOUD_0)
|
||||
return
|
||||
resource_info = FernetKeyManager.to_resource_info(keys)
|
||||
key_list = FernetKeyManager.from_resource_info(resource_info)
|
||||
self.update_fernet_repo(subcloud_name, key_list)
|
||||
|
||||
def reset_keys(self, subcloud_name):
|
||||
self.update_fernet_repo(subcloud_name)
|
||||
|
||||
@staticmethod
|
||||
def update_fernet_repo(subcloud_name, key_list=None):
|
||||
try:
|
||||
os_client = sdk.OpenStackDriver(subcloud_name)
|
||||
os_client.sysinv_client.post_fernet_repo(key_list)
|
||||
except (exceptions.ConnectionRefused, exceptions.NotAuthorized,
|
||||
exceptions.TimeOut):
|
||||
LOG.info(_("Update the fernet repo on %s timeout") %
|
||||
subcloud_name)
|
||||
except Exception as e:
|
||||
error_msg = "subcloud: {}, {}".format(subcloud_name, e.message)
|
||||
LOG.info(_("Fail to update fernet repo %s") % error_msg)
|
||||
|
@ -13,7 +13,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from dcorch.common import exceptions
|
||||
@ -85,6 +84,14 @@ class GenericSyncManager(object):
|
||||
except KeyError:
|
||||
raise exceptions.SubcloudNotFound(region_name=subcloud_name)
|
||||
|
||||
def initial_sync(self, context, subcloud_name):
|
||||
try:
|
||||
subcloud_engine = self.subcloud_engines[subcloud_name]
|
||||
LOG.info('Initial sync subcloud %(sc)s' % {'sc': subcloud_name})
|
||||
subcloud_engine.initial_sync()
|
||||
except KeyError:
|
||||
raise exceptions.SubcloudNotFound(region_name=subcloud_name)
|
||||
|
||||
def run_sync_audit(self):
|
||||
for subcloud_engine in self.subcloud_engines.values():
|
||||
subcloud_engine.run_sync_audit()
|
||||
|
@ -193,11 +193,26 @@ class EngineService(service.Service):
|
||||
# keep equivalent functionality for now
|
||||
if (management_state == dcm_consts.MANAGEMENT_MANAGED) and \
|
||||
(availability_status == dcm_consts.AVAILABILITY_ONLINE):
|
||||
self.fkm.distribute_keys(ctxt, subcloud_name)
|
||||
self.aam.enable_snmp(ctxt, subcloud_name)
|
||||
self.gsm.enable_subcloud(ctxt, subcloud_name)
|
||||
# Initial identity sync. It's synchronous so that identity
|
||||
# get synced before fernet token keys are synced. This is
|
||||
# necessary since we want to revoke all existing tokens on
|
||||
# this subcloud after its services user IDs and project
|
||||
# IDs are changed. Otherwise subcloud services will fail
|
||||
# authentication since they keep on using their existing tokens
|
||||
# issued before these IDs change, until these tokens expires.
|
||||
try:
|
||||
self.gsm.initial_sync(ctxt, subcloud_name)
|
||||
self.fkm.distribute_keys(ctxt, subcloud_name)
|
||||
self.aam.enable_snmp(ctxt, subcloud_name)
|
||||
self.gsm.enable_subcloud(ctxt, subcloud_name)
|
||||
except Exception as ex:
|
||||
LOG.warning('Update subcloud state failed for %s: %s',
|
||||
subcloud_name, six.text_type(ex))
|
||||
raise
|
||||
else:
|
||||
self.gsm.disable_subcloud(ctxt, subcloud_name)
|
||||
if (management_state == dcm_consts.MANAGEMENT_UNMANAGED):
|
||||
self.fkm.reset_keys(subcloud_name)
|
||||
|
||||
@request_context
|
||||
# todo: add authentication since ctxt not actually needed later
|
||||
|
@ -106,6 +106,11 @@ class SubCloudEngine(object):
|
||||
self.shutdown()
|
||||
self.subcloud.delete()
|
||||
|
||||
def initial_sync(self):
|
||||
# initial synchronization of the subcloud
|
||||
for thread in self.sync_threads:
|
||||
thread.initial_sync()
|
||||
|
||||
def run_sync_audit(self):
|
||||
# run periodic sync audit on all threads in this subcloud
|
||||
if self.is_enabled():
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -799,7 +799,7 @@ class SysinvSyncThread(SyncThread):
|
||||
|
||||
s_os_client = sdk.OpenStackDriver(self.region_name)
|
||||
try:
|
||||
s_os_client.sysinv_client.create_fernet_repo(
|
||||
s_os_client.sysinv_client.post_fernet_repo(
|
||||
FernetKeyManager.from_resource_info(resource_info))
|
||||
# Ensure subcloud resource is persisted to the DB for later
|
||||
subcloud_rsrc_id = self.persist_db_subcloud_resource(
|
||||
@ -831,7 +831,7 @@ class SysinvSyncThread(SyncThread):
|
||||
|
||||
s_os_client = sdk.OpenStackDriver(self.region_name)
|
||||
try:
|
||||
s_os_client.sysinv_client.update_fernet_repo(
|
||||
s_os_client.sysinv_client.put_fernet_repo(
|
||||
FernetKeyManager.from_resource_info(resource_info))
|
||||
# Ensure subcloud resource is persisted to the DB for later
|
||||
subcloud_rsrc_id = self.persist_db_subcloud_resource(
|
||||
@ -1309,7 +1309,7 @@ class SysinvSyncThread(SyncThread):
|
||||
resource_type),
|
||||
extra=self.log_extra)
|
||||
|
||||
def audit_discrepancy(self, resource_type, m_resource, sc_resources):
|
||||
def audit_discrepancy(self, resource_type, m_resource):
|
||||
# Return true to try the audit_action
|
||||
if resource_type in self.SYSINV_ADD_DELETE_RESOURCES:
|
||||
# It could be that the details are different
|
||||
@ -1329,7 +1329,7 @@ class SysinvSyncThread(SyncThread):
|
||||
extra=self.log_extra)
|
||||
return False
|
||||
|
||||
def audit_action(self, resource_type, finding, resource):
|
||||
def audit_action(self, resource_type, finding, resource, sc_source=None):
|
||||
if resource_type in self.SYSINV_MODIFY_RESOURCES:
|
||||
LOG.info("audit_action: {}/{}"
|
||||
.format(finding, resource_type),
|
||||
|
@ -15,12 +15,10 @@
|
||||
|
||||
import threading
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from keystoneauth1 import loading
|
||||
from keystoneauth1 import session
|
||||
from keystoneclient import client as keystoneclient
|
||||
|
||||
from dcdbsync.dbsyncclient import client as dbsyncclient
|
||||
from dcmanager.common import consts as dcmanager_consts
|
||||
from dcmanager.rpc import client as dcmanager_rpc_client
|
||||
from dcorch.common import consts
|
||||
@ -30,7 +28,11 @@ from dcorch.common import utils
|
||||
from dcorch.objects import orchrequest
|
||||
from dcorch.objects import resource
|
||||
from dcorch.objects import subcloud_resource
|
||||
from oslo_config import cfg
|
||||
|
||||
from keystoneauth1 import loading
|
||||
from keystoneauth1 import session
|
||||
from keystoneclient import client as keystoneclient
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -81,6 +83,7 @@ class SyncThread(object):
|
||||
self.sc_admin_session = None
|
||||
self.admin_session = None
|
||||
self.ks_client = None
|
||||
self.dbs_client = None
|
||||
|
||||
def start(self):
|
||||
if self.status == STATUS_NEW:
|
||||
@ -124,9 +127,15 @@ class SyncThread(object):
|
||||
user_domain_name=cfg.CONF.cache.admin_user_domain_name)
|
||||
self.admin_session = session.Session(
|
||||
auth=auth, timeout=60, additional_headers=consts.USER_HEADER)
|
||||
# keystone client
|
||||
self.ks_client = keystoneclient.Client(
|
||||
session=self.admin_session,
|
||||
region_name=consts.VIRTUAL_MASTER_CLOUD)
|
||||
# dcdbsync client
|
||||
self.dbs_client = dbsyncclient.Client(
|
||||
endpoint_type=consts.DBS_ENDPOINT_INTERNAL,
|
||||
session=self.admin_session,
|
||||
region_name=consts.VIRTUAL_MASTER_CLOUD)
|
||||
|
||||
def initialize_sc_clients(self):
|
||||
# base implementation of initializing the subcloud specific
|
||||
@ -164,6 +173,10 @@ class SyncThread(object):
|
||||
auth=sc_auth, timeout=60,
|
||||
additional_headers=consts.USER_HEADER)
|
||||
|
||||
def initial_sync(self):
|
||||
# Return True to indicate initial sync success
|
||||
return True
|
||||
|
||||
def enable(self):
|
||||
# Called when DC manager thinks this subcloud is good to go.
|
||||
self.initialize()
|
||||
@ -530,6 +543,7 @@ class SyncThread(object):
|
||||
extra=self.log_extra)
|
||||
# Subcloud resource is present in DB, but the check
|
||||
# for same_resource() was negative. Either the resource
|
||||
|
||||
# disappeared from subcloud or the resource details
|
||||
# are different from that of master cloud. Let the
|
||||
# resource implementation decide on the audit action.
|
||||
@ -556,6 +570,7 @@ class SyncThread(object):
|
||||
# Resource is missing from subcloud, take action
|
||||
num_of_audit_jobs += self.audit_action(
|
||||
resource_type, AUDIT_RESOURCE_MISSING, m_r)
|
||||
|
||||
# As the subcloud resource is missing, invoke
|
||||
# the hook for dependants with no subcloud resource.
|
||||
# Resource implementation should handle this.
|
||||
@ -667,6 +682,9 @@ class SyncThread(object):
|
||||
def same_resource(self, resource_type, m_resource, sc_resource):
|
||||
return True
|
||||
|
||||
def has_same_ids(self, resource_type, m_resource, sc_resource):
|
||||
return False
|
||||
|
||||
def map_subcloud_resource(self, resource_type, m_r, m_rsrc_db,
|
||||
sc_resources):
|
||||
# Child classes can override this function to map an existing subcloud
|
||||
|
Loading…
Reference in New Issue
Block a user