Apply black formatter to dcdbsync/api

This commit applies the Black format to the `dcdbsync/api`
files to ensure that it adheres to the Black code style guidelines.

Test Plan:
PASS: Success in stx-distcloud-tox-black

Story: 2011149
Task: 50443

Change-Id: Iedeaf1eced6dad414b3ef538798c7dd14b0249d6
Signed-off-by: Hugo Brito <hugo.brito@windriver.com>
This commit is contained in:
Hugo Brito 2024-06-26 14:01:40 -03:00 committed by Hugo Nicodemos
parent 69970aac60
commit 83dbf64aca
15 changed files with 232 additions and 205 deletions

22
.git-blame-ignore-revs Normal file
View File

@ -0,0 +1,22 @@
# git hyper-blame master ignore list.
#
# This file contains a list of git hashes of revisions to be ignored by git
# hyper-blame. These revisions are considered "unimportant" in that they are
# unlikely to be what you are interested in when blaming.
#
# Instructions:
# - Only large (generally automated) reformatting or renaming commits should be
# added to this list. Do not put things here just because you feel they are
# trivial or unimportant. If in doubt, do not put it on this list.
# - Precede each revision with a comment containing the first line of its log.
# For bulk work over many commits, place all commits in a block with a single
# comment at the top describing the work done in those commits.
# - Only put full 40-character hashes on this list (not short hashes or any
# other revision reference).
# - Append to the bottom of the file (revisions should be in chronological order
# from oldest to newest).
# - Because you must use a hash, you need to append to this list in a follow-up
# commit to the actual reformatting commit that you are trying to ignore.
# Format all Python files with Black formatter
69970aac60f91f1bc7cbc1323364a5a782756cfe

View File

@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2019 Wind River Systems, Inc.
# Copyright (c) 2019, 2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -38,29 +38,36 @@ from dcdbsync.common import version
LOG = logging.getLogger(__name__)
common_opts = [
cfg.StrOpt('bind_host', default='0.0.0.0',
help=_("The host IP to bind to")),
cfg.IntOpt('bind_port', default=8119,
help=_("The port to bind to")),
cfg.IntOpt('api_workers', default=2,
help=_("number of api workers")),
cfg.StrOpt('state_path',
default=os.path.join(os.path.dirname(__file__), '../'),
help='Top-level directory for maintaining dcdbsync state'),
cfg.StrOpt('api_extensions_path', default="",
help=_("The path for API extensions")),
cfg.StrOpt('auth_strategy', default='keystone',
help=_("The type of authentication to use")),
cfg.BoolOpt('allow_bulk', default=True,
help=_("Allow the usage of the bulk API")),
cfg.BoolOpt('allow_pagination', default=False,
help=_("Allow the usage of the pagination")),
cfg.BoolOpt('allow_sorting', default=False,
help=_("Allow the usage of the sorting")),
cfg.StrOpt('pagination_max_limit', default="-1",
help=_("The maximum number of items returned in a single "
cfg.StrOpt("bind_host", default="0.0.0.0", help=_("The host IP to bind to")),
cfg.IntOpt("bind_port", default=8119, help=_("The port to bind to")),
cfg.IntOpt("api_workers", default=2, help=_("number of api workers")),
cfg.StrOpt(
"state_path",
default=os.path.join(os.path.dirname(__file__), "../"),
help="Top-level directory for maintaining dcdbsync state",
),
cfg.StrOpt(
"api_extensions_path", default="", help=_("The path for API extensions")
),
cfg.StrOpt(
"auth_strategy", default="keystone", help=_("The type of authentication to use")
),
cfg.BoolOpt("allow_bulk", default=True, help=_("Allow the usage of the bulk API")),
cfg.BoolOpt(
"allow_pagination", default=False, help=_("Allow the usage of the pagination")
),
cfg.BoolOpt(
"allow_sorting", default=False, help=_("Allow the usage of the sorting")
),
cfg.StrOpt(
"pagination_max_limit",
default="-1",
help=_(
"The maximum number of items returned in a single "
"response, value was 'infinite' or negative integer "
"means no limit")),
"means no limit"
),
),
]
@ -72,9 +79,12 @@ def init(args, **kwargs):
# auth.register_conf_options(cfg.CONF)
logging.register_options(cfg.CONF)
cfg.CONF(args=args, project='dcdbsync',
version='%%(prog)s %s' % version.version_info.release_string(),
**kwargs)
cfg.CONF(
args=args,
project="dcdbsync",
version="%%(prog)s %s" % version.version_info.release_string(),
**kwargs
)
def setup_logging():
@ -82,9 +92,10 @@ def setup_logging():
product_name = "dcdbsync"
logging.setup(cfg.CONF, product_name)
LOG.info("Logging enabled!")
LOG.info("%(prog)s version %(version)s",
{'prog': sys.argv[0],
'version': version.version_info.release_string()})
LOG.info(
"%(prog)s version %(version)s",
{"prog": sys.argv[0], "version": version.version_info.release_string()},
)
LOG.debug("command line: %s", " ".join(sys.argv))

View File

@ -33,20 +33,14 @@ def setup_app(*args, **kwargs):
opts = cfg.CONF.pecan
config = {
'server': {
'port': cfg.CONF.bind_port,
'host': cfg.CONF.bind_host
},
'app': {
'root': 'dcdbsync.api.controllers.root.RootController',
'modules': ['dcdbsync.api'],
"server": {"port": cfg.CONF.bind_port, "host": cfg.CONF.bind_host},
"app": {
"root": "dcdbsync.api.controllers.root.RootController",
"modules": ["dcdbsync.api"],
"debug": opts.debug,
"auth_enable": opts.auth_enable,
'errors': {
400: '/error',
'__force_dict__': True
}
}
"errors": {400: "/error", "__force_dict__": True},
},
}
pecan_config = pecan.configuration.conf_from_dict(config)
@ -59,7 +53,7 @@ def setup_app(*args, **kwargs):
wrap_app=_wrap_app,
force_canonical=False,
hooks=lambda: [ctx.AuthHook()],
guess_content_type_from_ext=True
guess_content_type_from_ext=True,
)
return app
@ -67,10 +61,10 @@ def setup_app(*args, **kwargs):
def _wrap_app(app):
app = request_id.RequestId(app)
if cfg.CONF.pecan.auth_enable and cfg.CONF.auth_strategy == 'keystone':
if cfg.CONF.pecan.auth_enable and cfg.CONF.auth_strategy == "keystone":
conf = dict(cfg.CONF.keystone_authtoken)
# Change auth decisions of requests to the app itself.
conf.update({'delay_auth_decision': True})
conf.update({"delay_auth_decision": True})
# NOTE: Policy enforcement works only if Keystone
# authentication is enabled. No support for other authentication
@ -86,7 +80,7 @@ _launcher = None
def serve(api_service, conf, workers=1):
global _launcher
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
raise RuntimeError(_("serve() can only be called once"))
_launcher = service.launch(conf, api_service, workers=workers)

View File

@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2019, 2022 Wind River Systems, Inc.
# Copyright (c) 2019, 2022, 2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -25,22 +25,24 @@ import dcdbsync.common.context as k_context
def extract_context_from_environ():
context_paras = {'auth_token': 'HTTP_X_AUTH_TOKEN',
'user': 'HTTP_X_USER_ID',
'project': 'HTTP_X_TENANT_ID',
'user_name': 'HTTP_X_USER_NAME',
'tenant_name': 'HTTP_X_PROJECT_NAME',
'domain': 'HTTP_X_DOMAIN_ID',
'roles': 'HTTP_X_ROLE',
'user_domain': 'HTTP_X_USER_DOMAIN_ID',
'project_domain': 'HTTP_X_PROJECT_DOMAIN_ID',
'request_id': 'openstack.request_id'}
context_paras = {
"auth_token": "HTTP_X_AUTH_TOKEN",
"user": "HTTP_X_USER_ID",
"project": "HTTP_X_TENANT_ID",
"user_name": "HTTP_X_USER_NAME",
"tenant_name": "HTTP_X_PROJECT_NAME",
"domain": "HTTP_X_DOMAIN_ID",
"roles": "HTTP_X_ROLE",
"user_domain": "HTTP_X_USER_DOMAIN_ID",
"project_domain": "HTTP_X_PROJECT_DOMAIN_ID",
"request_id": "openstack.request_id",
}
environ = request.environ
for key, val in context_paras.items():
context_paras[key] = environ.get(val)
role = environ.get('HTTP_X_ROLE')
role = environ.get("HTTP_X_ROLE")
context_paras['is_admin'] = 'admin' in role.split(',')
context_paras["is_admin"] = "admin" in role.split(",")
return k_context.RequestContext(**context_paras)

View File

@ -25,16 +25,16 @@ from dcdbsync.api.controllers.v1 import root as v1_root
class RootController(object):
@pecan.expose('json')
@pecan.expose("json")
def _lookup(self, version, *remainder):
version = str(version)
minor_version = version[-1]
major_version = version[1]
remainder = remainder + (minor_version,)
if major_version == '1':
if major_version == "1":
return v1_root.Controller(), remainder
@pecan.expose(generic=True, template='json')
@pecan.expose(generic=True, template="json")
def index(self):
return {
"versions": [
@ -43,19 +43,19 @@ class RootController(object):
"links": [
{
"rel": "self",
"href": pecan.request.application_url + "/v1.0/"
"href": pecan.request.application_url + "/v1.0/",
}
],
"id": "v1.0",
"updated": "2018-11-20"
"updated": "2018-11-20",
}
]
}
@index.when(method='POST')
@index.when(method='PUT')
@index.when(method='DELETE')
@index.when(method='HEAD')
@index.when(method='PATCH')
@index.when(method="POST")
@index.when(method="PUT")
@index.when(method="DELETE")
@index.when(method="HEAD")
@index.when(method="PATCH")
def not_supported(self):
pecan.abort(405)

View File

@ -38,7 +38,7 @@ LOG = logging.getLogger(__name__)
class UsersController(object):
VERSION_ALIASES = {
'Stein': '1.0',
"Stein": "1.0",
}
def __init__(self):
@ -49,12 +49,12 @@ class UsersController(object):
version_cap = 1.0
return version_cap
@expose(generic=True, template='json')
@expose(generic=True, template="json")
def index(self):
# Route the request to specific methods with parameters
pass
@index.when(method='GET', template='json')
@index.when(method="GET", template="json")
def get(self, user_ref=None):
"""Get a list of users."""
context = restcomm.extract_context_from_environ()
@ -71,9 +71,9 @@ class UsersController(object):
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to get user'))
pecan.abort(500, _("Unable to get user"))
@index.when(method='POST', template='json')
@index.when(method="POST", template="json")
def post(self):
"""Create a new user."""
@ -83,14 +83,14 @@ class UsersController(object):
try:
payload = json.loads(request.body)
except ValueError:
pecan.abort(400, _('Request body decoding error'))
pecan.abort(400, _("Request body decoding error"))
if not payload:
pecan.abort(400, _('Body required'))
user_name = payload.get('local_user').get('name')
pecan.abort(400, _("Body required"))
user_name = payload.get("local_user").get("name")
if not user_name:
pecan.abort(400, _('User name required'))
pecan.abort(400, _("User name required"))
try:
# Insert the user into DB tables
@ -100,25 +100,25 @@ class UsersController(object):
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to create user'))
pecan.abort(500, _("Unable to create user"))
@index.when(method='PUT', template='json')
@index.when(method="PUT", template="json")
def put(self, user_ref=None):
"""Update a existing user."""
context = restcomm.extract_context_from_environ()
if user_ref is None:
pecan.abort(400, _('User ID required'))
pecan.abort(400, _("User ID required"))
# Convert JSON string in request to Python dict
try:
payload = json.loads(request.body)
except ValueError:
pecan.abort(400, _('Request body decoding error'))
pecan.abort(400, _("Request body decoding error"))
if not payload:
pecan.abort(400, _('Body required'))
pecan.abort(400, _("Body required"))
try:
# Update the user in DB tables
@ -129,12 +129,12 @@ class UsersController(object):
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to update user'))
pecan.abort(500, _("Unable to update user"))
class GroupsController(object):
VERSION_ALIASES = {
'Stein': '1.0',
"Stein": "1.0",
}
def __init__(self):
@ -145,12 +145,12 @@ class GroupsController(object):
version_cap = 1.0
return version_cap
@expose(generic=True, template='json')
@expose(generic=True, template="json")
def index(self):
# Route the request to specific methods with parameters
pass
@index.when(method='GET', template='json')
@index.when(method="GET", template="json")
def get(self, group_ref=None):
"""Get a list of groups."""
context = restcomm.extract_context_from_environ()
@ -167,9 +167,9 @@ class GroupsController(object):
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to get group'))
pecan.abort(500, _("Unable to get group"))
@index.when(method='POST', template='json')
@index.when(method="POST", template="json")
def post(self):
"""Create a new group."""
@ -179,14 +179,14 @@ class GroupsController(object):
try:
payload = json.loads(request.body)
except ValueError:
pecan.abort(400, _('Request body decoding error'))
pecan.abort(400, _("Request body decoding error"))
if not payload:
pecan.abort(400, _('Body required'))
group_name = payload.get('group').get('name')
pecan.abort(400, _("Body required"))
group_name = payload.get("group").get("name")
if not group_name:
pecan.abort(400, _('Group name required'))
pecan.abort(400, _("Group name required"))
try:
# Insert the group into DB tables
@ -196,25 +196,25 @@ class GroupsController(object):
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to create group'))
pecan.abort(500, _("Unable to create group"))
@index.when(method='PUT', template='json')
@index.when(method="PUT", template="json")
def put(self, group_ref=None):
"""Update a existing group."""
context = restcomm.extract_context_from_environ()
if group_ref is None:
pecan.abort(400, _('Group ID required'))
pecan.abort(400, _("Group ID required"))
# Convert JSON string in request to Python dict
try:
payload = json.loads(request.body)
except ValueError:
pecan.abort(400, _('Request body decoding error'))
pecan.abort(400, _("Request body decoding error"))
if not payload:
pecan.abort(400, _('Body required'))
pecan.abort(400, _("Body required"))
try:
# Update the group in DB tables
@ -225,4 +225,4 @@ class GroupsController(object):
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to update group'))
pecan.abort(500, _("Unable to update group"))

View File

@ -38,7 +38,7 @@ LOG = logging.getLogger(__name__)
class ProjectsController(object):
VERSION_ALIASES = {
'Stein': '1.0',
"Stein": "1.0",
}
def __init__(self):
@ -49,12 +49,12 @@ class ProjectsController(object):
version_cap = 1.0
return version_cap
@expose(generic=True, template='json')
@expose(generic=True, template="json")
def index(self):
# Route the request to specific methods with parameters
pass
@index.when(method='GET', template='json')
@index.when(method="GET", template="json")
def get(self, project_ref=None):
context = restcomm.extract_context_from_environ()
@ -72,9 +72,9 @@ class ProjectsController(object):
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to get project'))
pecan.abort(500, _("Unable to get project"))
@index.when(method='POST', template='json')
@index.when(method="POST", template="json")
def post(self):
"""Create a new project."""
@ -83,14 +83,14 @@ class ProjectsController(object):
try:
payload = json.loads(request.body)
except ValueError:
pecan.abort(400, _('Request body decoding error'))
pecan.abort(400, _("Request body decoding error"))
if not payload:
pecan.abort(400, _('Body required'))
project_name = payload.get('project').get('name')
pecan.abort(400, _("Body required"))
project_name = payload.get("project").get("name")
if not project_name:
pecan.abort(400, _('project name required'))
pecan.abort(400, _("project name required"))
try:
# Insert the project into DB tables
@ -100,25 +100,25 @@ class ProjectsController(object):
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to create project'))
pecan.abort(500, _("Unable to create project"))
@index.when(method='PUT', template='json')
@index.when(method="PUT", template="json")
def put(self, project_ref=None):
"""Update a existing project."""
context = restcomm.extract_context_from_environ()
if project_ref is None:
pecan.abort(400, _('Project ID required'))
pecan.abort(400, _("Project ID required"))
# Convert JSON string in request to Python dict
try:
payload = json.loads(request.body)
except ValueError:
pecan.abort(400, _('Request body decoding error'))
pecan.abort(400, _("Request body decoding error"))
if not payload:
pecan.abort(400, _('Body required'))
pecan.abort(400, _("Body required"))
try:
# Update the project in DB tables
@ -130,4 +130,4 @@ class ProjectsController(object):
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to update project'))
pecan.abort(500, _("Unable to update project"))

View File

@ -38,7 +38,7 @@ LOG = logging.getLogger(__name__)
class RolesController(object):
VERSION_ALIASES = {
'Stein': '1.0',
"Stein": "1.0",
}
def __init__(self):
@ -49,12 +49,12 @@ class RolesController(object):
version_cap = 1.0
return version_cap
@expose(generic=True, template='json')
@expose(generic=True, template="json")
def index(self):
# Route the request to specific methods with parameters
pass
@index.when(method='GET', template='json')
@index.when(method="GET", template="json")
def get(self, role_ref=None):
"""Get a list of roles."""
context = restcomm.extract_context_from_environ()
@ -72,9 +72,9 @@ class RolesController(object):
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to get role'))
pecan.abort(500, _("Unable to get role"))
@index.when(method='POST', template='json')
@index.when(method="POST", template="json")
def post(self):
"""Create a new role."""
@ -84,14 +84,14 @@ class RolesController(object):
try:
payload = json.loads(request.body)
except ValueError:
pecan.abort(400, _('Request body decoding error'))
pecan.abort(400, _("Request body decoding error"))
if not payload:
pecan.abort(400, _('Body required'))
role_name = payload.get('role').get('name')
pecan.abort(400, _("Body required"))
role_name = payload.get("role").get("name")
if not role_name:
pecan.abort(400, _('role name required'))
pecan.abort(400, _("role name required"))
try:
# Insert the role into DB tables
@ -101,25 +101,25 @@ class RolesController(object):
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to create role'))
pecan.abort(500, _("Unable to create role"))
@index.when(method='PUT', template='json')
@index.when(method="PUT", template="json")
def put(self, role_ref=None):
"""Update a existing role."""
context = restcomm.extract_context_from_environ()
if role_ref is None:
pecan.abort(400, _('Role ID required'))
pecan.abort(400, _("Role ID required"))
# Convert JSON string in request to Python dict
try:
payload = json.loads(request.body)
except ValueError:
pecan.abort(400, _('Request body decoding error'))
pecan.abort(400, _("Request body decoding error"))
if not payload:
pecan.abort(400, _('Body required'))
pecan.abort(400, _("Body required"))
try:
# Update the role in DB tables
@ -131,4 +131,4 @@ class RolesController(object):
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to update role'))
pecan.abort(500, _("Unable to update role"))

View File

@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2019-2021 Wind River Systems, Inc.
# Copyright (c) 2019-2021, 2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -45,8 +45,9 @@ class IdentityController(object):
res_controllers["groups"] = identity.GroupsController
res_controllers["projects"] = project.ProjectsController
res_controllers["roles"] = role.RolesController
res_controllers["token-revocation-events"] = \
res_controllers["token-revocation-events"] = (
token_revoke_event.RevokeEventsController
)
for name, ctrl in res_controllers.items():
setattr(self, name, ctrl)

View File

@ -39,7 +39,7 @@ LOG = logging.getLogger(__name__)
class RevokeEventsController(object):
VERSION_ALIASES = {
'Stein': '1.0',
"Stein": "1.0",
}
def __init__(self):
@ -50,12 +50,12 @@ class RevokeEventsController(object):
version_cap = 1.0
return version_cap
@expose(generic=True, template='json')
@expose(generic=True, template="json")
def index(self):
# Route the request to specific methods with parameters
pass
@index.when(method='POST', template='json')
@index.when(method="POST", template="json")
def post(self):
"""Create a new token revoke event."""
@ -65,10 +65,10 @@ class RevokeEventsController(object):
try:
payload = json.loads(request.body)
except ValueError:
pecan.abort(400, _('Request body decoding error'))
pecan.abort(400, _("Request body decoding error"))
if not payload:
pecan.abort(400, _('Body required'))
pecan.abort(400, _("Body required"))
try:
# Insert the token revoke event into DB tables
@ -78,9 +78,9 @@ class RevokeEventsController(object):
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to create token revocation event'))
pecan.abort(500, _("Unable to create token revocation event"))
@index.when(method='GET', template='json')
@index.when(method="GET", template="json")
def get(self):
"""Get all of token revoke events."""
context = restcomm.extract_context_from_environ()
@ -90,7 +90,7 @@ class RevokeEventsController(object):
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to get token revocation events'))
pecan.abort(500, _("Unable to get token revocation events"))
def _get_resource_controller(self, remainder):
if not remainder:
@ -121,126 +121,124 @@ class UsersController(object):
def __init__(self):
super(UsersController, self).__init__()
@expose(generic=True, template='json')
@expose(generic=True, template="json")
def index(self):
# Route the request to specific methods with parameters
pass
@index.when(method='GET', template='json')
@index.when(method="GET", template="json")
def get(self, event_id=None):
"""Get a token revoke event by user_id and issued_before."""
context = restcomm.extract_context_from_environ()
if event_id is None:
pecan.abort(400, _('Event ID required'))
pecan.abort(400, _("Event ID required"))
try:
# user specific event id is in the format of
# <user_id>_<issued_before> and encoded in base64
event_ref = base64.urlsafe_b64decode(event_id).decode('utf-8')
event_tags = event_ref.split('_')
event_ref = base64.urlsafe_b64decode(event_id).decode("utf-8")
event_tags = event_ref.split("_")
user_id = event_tags[0]
issued_before = event_tags[1]
revoke_event = db_api.\
revoke_event_get_by_user(context, user_id=user_id,
issued_before=issued_before)
revoke_event = db_api.revoke_event_get_by_user(
context, user_id=user_id, issued_before=issued_before
)
return revoke_event
except (IndexError, TypeError):
pecan.abort(404, _('Invalid event ID format'))
pecan.abort(404, _("Invalid event ID format"))
except exceptions.RevokeEventNotFound:
unique_id = "user_id {} and issued_before {}".\
format(user_id, issued_before)
pecan.abort(404, _("Token revocation event %s doesn't exist.")
% unique_id)
unique_id = "user_id {} and issued_before {}".format(user_id, issued_before)
pecan.abort(404, _("Token revocation event %s doesn't exist.") % unique_id)
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to get token revocation event'))
pecan.abort(500, _("Unable to get token revocation event"))
@index.when(method='DELETE')
@index.when(method="DELETE")
def delete(self, event_id=None):
"""Delete a token revoke event by user_id and issued_before."""
context = restcomm.extract_context_from_environ()
if event_id is None:
pecan.abort(400, _('Event ID required'))
pecan.abort(400, _("Event ID required"))
try:
# user specific event id is in the format of
# <user_id>_<issued_before> and encoded in base64
event_ref = base64.urlsafe_b64decode(event_id).decode('utf-8')
event_tags = event_ref.split('_')
event_ref = base64.urlsafe_b64decode(event_id).decode("utf-8")
event_tags = event_ref.split("_")
user_id = event_tags[0]
issued_before = event_tags[1]
db_api.revoke_event_delete_by_user(context, user_id=user_id,
issued_before=issued_before)
response.headers['Content-Type'] = None
db_api.revoke_event_delete_by_user(
context, user_id=user_id, issued_before=issued_before
)
response.headers["Content-Type"] = None
except (IndexError, TypeError):
pecan.abort(404, _('Invalid event ID format'))
pecan.abort(404, _("Invalid event ID format"))
except exceptions.RevokeEventNotFound:
unique_id = "user_id {} and issued_before {}".\
format(user_id, issued_before)
pecan.abort(404, _("Token revocation event %s doesn't exist.")
% unique_id)
unique_id = "user_id {} and issued_before {}".format(user_id, issued_before)
pecan.abort(404, _("Token revocation event %s doesn't exist.") % unique_id)
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to delete token revocation event'))
pecan.abort(500, _("Unable to delete token revocation event"))
class AuditsController(object):
def __init__(self):
super(AuditsController, self).__init__()
@expose(generic=True, template='json')
@expose(generic=True, template="json")
def index(self):
# Route the request to specific methods with parameters
pass
@index.when(method='GET', template='json')
@index.when(method="GET", template="json")
def get(self, audit_id=None):
"""Get a token revoke event by revocation_event.audit_id."""
context = restcomm.extract_context_from_environ()
if audit_id is None:
pecan.abort(400, _('Audit ID required'))
pecan.abort(400, _("Audit ID required"))
try:
revoke_event = db_api.\
revoke_event_get_by_audit(context, audit_id=audit_id)
revoke_event = db_api.revoke_event_get_by_audit(context, audit_id=audit_id)
return revoke_event
except exceptions.RevokeEventNotFound:
pecan.abort(404, _("Token revocation event with id %s"
" doesn't exist.") % audit_id)
pecan.abort(
404, _("Token revocation event with id %s doesn't exist.") % audit_id
)
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to get token revocation event'))
pecan.abort(500, _("Unable to get token revocation event"))
@index.when(method='DELETE')
@index.when(method="DELETE")
def delete(self, audit_id=None):
"""Delete a token revoke event by revocation_event.audit_id."""
context = restcomm.extract_context_from_environ()
if audit_id is None:
pecan.abort(400, _('Audit ID required'))
pecan.abort(400, _("Audit ID required"))
try:
db_api.revoke_event_delete_by_audit(context, audit_id=audit_id)
response.headers['Content-Type'] = None
response.headers["Content-Type"] = None
except exceptions.RevokeEventNotFound:
pecan.abort(404, _("Token revocation event with id %s"
" doesn't exist.") % audit_id)
pecan.abort(
404, _("Token revocation event with id %s doesn't exist.") % audit_id
)
except Exception as e:
LOG.exception(e)
pecan.abort(500, _('Unable to delete token revocation event'))
pecan.abort(500, _("Unable to delete token revocation event"))

View File

@ -35,7 +35,7 @@ class Controller(object):
remainder = remainder[:-1]
sub_controllers = dict()
if minor_version == '0':
if minor_version == "0":
sub_controllers["identity"] = root.IdentityController
for name, ctrl in sub_controllers.items():

View File

@ -1,5 +1,5 @@
#
# Copyright (c) 2022 Wind River Systems, Inc.
# Copyright (c) 2022, 2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -10,6 +10,4 @@ from dcdbsync.api.policies import base
def list_rules():
return itertools.chain(
base.list_rules()
)
return itertools.chain(base.list_rules())

View File

@ -1,28 +1,26 @@
#
# Copyright (c) 2022 Wind River Systems, Inc.
# Copyright (c) 2022, 2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
from oslo_policy import policy
ADMIN_IN_SYSTEM_PROJECTS = 'admin_in_system_projects'
READER_IN_SYSTEM_PROJECTS = 'reader_in_system_projects'
ADMIN_IN_SYSTEM_PROJECTS = "admin_in_system_projects"
READER_IN_SYSTEM_PROJECTS = "reader_in_system_projects"
base_rules = [
policy.RuleDefault(
name=ADMIN_IN_SYSTEM_PROJECTS,
check_str='role:admin and (project_name:admin or ' +
'project_name:services)',
check_str="role:admin and (project_name:admin or project_name:services)",
description="Base rule.",
),
policy.RuleDefault(
name=READER_IN_SYSTEM_PROJECTS,
check_str='role:reader and (project_name:admin or ' +
'project_name:services)',
description="Base rule."
)
check_str="role:reader and (project_name:admin or project_name:services)",
description="Base rule.",
),
]

View File

@ -36,7 +36,7 @@ def reset():
_ENFORCER = None
def init(policy_file='policy.yaml'):
def init(policy_file="policy.yaml"):
"""Init an Enforcer class.
:param policy_file: Custom policy file to be used.
@ -47,11 +47,13 @@ def init(policy_file='policy.yaml'):
if not _ENFORCER:
# https://docs.openstack.org/oslo.policy/latest/user/usage.html
_ENFORCER = policy.Enforcer(CONF,
_ENFORCER = policy.Enforcer(
CONF,
policy_file=policy_file,
default_rule='default',
default_rule="default",
use_conf=True,
overwrite=True)
overwrite=True,
)
_ENFORCER.register_defaults(controller_policies.list_rules())
return _ENFORCER
@ -59,5 +61,6 @@ def init(policy_file='policy.yaml'):
def authorize(rule, target, creds, do_raise=True):
"""A wrapper around 'authorize' from 'oslo_policy.policy'."""
init()
return _ENFORCER.authorize(rule, target, creds, do_raise=do_raise,
exc=exc.HTTPForbidden)
return _ENFORCER.authorize(
rule, target, creds, do_raise=do_raise, exc=exc.HTTPForbidden
)

View File

@ -23,7 +23,7 @@ modules = [
]
# List of modules that are already formatted with black
formatted_modules = ["dccommon"]
formatted_modules = ["dccommon", "dcdbsync/api"]
# Function to run black check