Refactor action validation functions

* Removes duplication of code in validate
* Breaks large validation functions into atomic, reusable pieces in
  super classes.
* Started applying the rule where validation does not leave any side
  effects on class members other than the valid boolean.
* Validate functions are not called after the first fail, this
  gives less feedback to user, but is probably reasonable general
  assumption as some validation relies on others passing.
* Adding some tests for NewUser validation.
* Refactored how the FakeManager handled resource objects or
  ids being passed to it.
* Fixed some issues with logs being spat out during tests.

Change-Id: Iea0afce06e92d8f1a1bda0cc03a32c00909828d1
This commit is contained in:
Dale Smith 2016-10-20 12:32:51 +01:00 committed by adrian-turjak
parent 47527734e2
commit 9d8adc726a
14 changed files with 720 additions and 555 deletions

View File

@ -25,16 +25,21 @@ LOGGING:
class: logging.FileHandler
filename: reg_log.log
loggers:
stacktask:
handlers:
- file
level: INFO
propagate: False
django:
handlers:
- file
level: INFO
propagate: True
propagate: False
keystonemiddleware:
handlers:
- file
level: INFO
propagate: True
propagate: False
EMAIL_SETTINGS:
EMAIL_BACKEND: django.core.mail.backends.console.EmailBackend

View File

@ -93,7 +93,7 @@ class BaseAction(object):
Sets up required data as fields.
"""
self.logger = getLogger('django.request')
self.logger = getLogger('stacktask')
for field in self.required:
field_data = data[field]
@ -172,11 +172,90 @@ class BaseAction(object):
return self.__class__.__name__
class UserAction(BaseAction):
"""
Base action for dealing with users.
Contains role utility functions.
"""
class ResourceMixin(object):
"""Base Mixin class for dealing with Openstack resources."""
def _validate_keystone_user(self):
keystone_user = self.action.task.keystone_user
if keystone_user['project_domain_id'] != self.domain_id:
self.add_note('Domain id does not match keystone user domain.')
return False
if keystone_user['project_id'] != self.project_id:
self.add_note('Project id does not match keystone user project.')
return False
return True
def _validate_domain_id(self):
id_manager = user_store.IdentityManager()
domain = id_manager.get_domain(self.domain_id)
if not domain:
self.add_note('Domain does not exist.')
return False
return True
def _validate_project_id(self):
# Handle an edge_case where some actions set their
# own project_id value.
if not self.project_id:
self.add_note('No project_id given.')
return False
# Now actually check the project exists.
id_manager = user_store.IdentityManager()
project = id_manager.get_project(self.project_id)
if not project:
self.add_note('Project with id %s does not exist.' %
self.project_id)
return False
self.add_note('Project with id %s exists.' % self.project_id)
return True
def _validate_domain_name(self):
id_manager = user_store.IdentityManager()
self.domain = id_manager.find_domain(self.domain_name)
if not self.domain:
self.add_note('Domain does not exist.')
return False
return True
class UserMixin(ResourceMixin):
"""Mixin with functions for users."""
def _validate_username_exists(self):
id_manager = user_store.IdentityManager()
self.user = id_manager.find_user(self.username, self.domain.id)
if not self.user:
self.add_note('No user present with username')
return False
return True
def _grant_roles(self, user, roles, project_id):
id_manager = user_store.IdentityManager()
ks_roles = []
for role in roles:
ks_role = id_manager.find_role(role)
if ks_role:
ks_roles.append(ks_role)
else:
raise TypeError("Keystone missing role: %s" % role)
for role in ks_roles:
id_manager.add_user_role(user, role, project_id)
def _validate_role_permissions(self):
keystone_user = self.action.task.keystone_user
# Role permissions check
if not self.are_roles_managable(user_roles=keystone_user['roles'],
requested_roles=self.roles):
self.add_note('User does not have permission to edit role(s).')
return False
return True
def are_roles_managable(self, user_roles=[], requested_roles=[]):
requested_roles = set(requested_roles)
@ -192,7 +271,52 @@ class UserAction(BaseAction):
return intersection == requested_roles
class UserIdAction(UserAction):
class ProjectMixin(ResourceMixin):
"""Mixin with functions for projects."""
def _validate_parent_project(self):
id_manager = user_store.IdentityManager()
# NOTE(adriant): If parent id is None, Keystone defaults to the domain.
# So we only care to validate if parent_id is not None.
if self.parent_id:
parent = id_manager.get_project(self.parent_id)
if not parent:
self.add_note("Parent id: '%s' does not exist." %
self.project_name)
return False
return True
def _validate_project_absent(self):
id_manager = user_store.IdentityManager()
project = id_manager.find_project(
self.project_name, self.domain_id)
if project:
self.add_note("Existing project with name '%s'." %
self.project_name)
return False
self.add_note("No existing project with name '%s'." %
self.project_name)
return True
def _create_project(self):
id_manager = user_store.IdentityManager()
try:
project = id_manager.create_project(
self.project_name, created_on=str(timezone.now()),
parent=self.parent_id, domain=self.domain_id)
except Exception as e:
self.add_note(
"Error: '%s' while creating project: %s" %
(e, self.project_name))
raise
# put project_id into action cache:
self.action.task.cache['project_id'] = project.id
self.set_cache('project_id', project.id)
self.add_note("New project '%s' created." % project.name)
class UserIdAction(BaseAction):
def _get_target_user(self):
"""
@ -202,10 +326,9 @@ class UserIdAction(UserAction):
user = id_manager.get_user(self.user_id)
return user
pass
class UserNameAction(UserAction):
class UserNameAction(BaseAction):
"""
Base action for dealing with users. Removes username if
USERNAME_IS_EMAIL and sets email to be username.
@ -236,7 +359,7 @@ class UserNameAction(UserAction):
return user
class NewUserAction(UserNameAction):
class NewUserAction(UserNameAction, ProjectMixin, UserMixin):
"""
Setup a new user with a role on the given project.
Creates the user if they don't exist, otherwise
@ -252,59 +375,29 @@ class NewUserAction(UserNameAction):
'domain_id',
]
def _validate(self):
def _validate_targer_user(self):
id_manager = user_store.IdentityManager()
# Default state is invalid
self.action.valid = False
keystone_user = self.action.task.keystone_user
if keystone_user['project_domain_id'] != self.domain_id:
self.add_note('Domain id does not match keystone user domain.')
return
if keystone_user['project_id'] != self.project_id:
self.add_note('Project id does not match keystone user project.')
return
# Role permissions check
if not self.are_roles_managable(user_roles=keystone_user['roles'],
requested_roles=self.roles):
self.add_note('User does not have permission to edit role(s).')
return
domain = id_manager.get_domain(self.domain_id)
if not domain:
self.add_note('Domain does not exist.')
return
project = id_manager.get_project(self.project_id)
if not project:
self.add_note('Project does not exist.')
return
# Get target user
# check if user exists and is valid
# this may mean we need a token.
user = self._get_target_user()
if not user:
self.action.need_token = True
self.set_token_fields(["password"])
self.add_note(
'No user present with username. Need to create new user.')
self.action.valid = True
return
return True
if user.email != self.email:
self.add_note(
'Found matching username, but email did not match.' +
'Reporting as invalid.')
return
return False
roles = id_manager.get_roles(user, project)
roles = {role.name for role in roles}
missing = set(self.roles) - roles
# role_validation
roles = id_manager.get_roles(user, self.project_id)
role_names = {role.name for role in roles}
missing = set(self.roles) - role_names
if not missing:
self.action.valid = True
self.action.need_token = False
self.action.state = "complete"
self.add_note(
@ -312,24 +405,32 @@ class NewUserAction(UserNameAction):
)
else:
self.roles = list(missing)
self.action.valid = True
self.action.need_token = True
self.set_token_fields(["confirm"])
self.action.state = "existing"
self.add_note(
'Existing user with matching email missing roles.')
return True
def _validate(self):
self.action.valid = (
self._validate_role_permissions() and
self._validate_keystone_user() and
self._validate_domain_id() and
self._validate_project_id() and
self._validate_targer_user()
)
self.action.save()
def _pre_approve(self):
self._validate()
self.action.save()
def _post_approve(self):
self._validate()
self.action.save()
def _submit(self, token_data):
self._validate()
self.action.save()
if not self.valid:
return
@ -339,21 +440,12 @@ class NewUserAction(UserNameAction):
if self.action.state == "default":
# default action: Create a new user in the tenant and add roles
try:
roles = []
for role in self.roles:
ks_role = id_manager.find_role(role)
if ks_role:
roles.append(ks_role)
else:
raise TypeError("Keystone missing role: %s" % role)
user = id_manager.create_user(
name=self.username, password=token_data['password'],
email=self.email, domain=self.domain_id,
created_on=str(timezone.now()))
for role in roles:
id_manager.add_user_role(user, role, self.project_id)
self._grant_roles(user, self.roles, self.project_id)
except Exception as e:
self.add_note(
"Error: '%s' while creating user: %s with roles: %s" %
@ -368,12 +460,7 @@ class NewUserAction(UserNameAction):
try:
user = id_manager.find_user(self.username, self.domain_id)
roles = []
for role in self.roles:
roles.append(id_manager.find_role(role))
for role in roles:
id_manager.add_user_role(user, role, self.project_id)
self._grant_roles(user, self.roles, self.project_id)
except Exception as e:
self.add_note(
"Error: '%s' while attaching user: %s with roles: %s" %
@ -390,70 +477,8 @@ class NewUserAction(UserNameAction):
% (self.username, self.roles, self.project_id))
class ProjectCreateBase(object):
"""Mixin with functions for project creation."""
def _validate_domain(self):
domain = self.id_manager.get_domain(self.domain_id)
if not domain:
self.add_note('Domain does not exist.')
return False
return True
def _validate_parent_project(self):
# NOTE(adriant): If parent id is None, Keystone defaults to the domain.
# So we only care to validate if parent_id is not None.
if self.parent_id:
parent = self.id_manager.get_project(self.parent_id)
if not parent:
self.add_note("Parent id: '%s' does not exist." %
self.project_name)
return False
return True
def _validate_project(self):
project = self.id_manager.find_project(
self.project_name, self.domain_id)
if project:
self.add_note("Existing project with name '%s'." %
self.project_name)
return False
self.add_note("No existing project with name '%s'." %
self.project_name)
return True
def _create_project(self):
try:
project = self.id_manager.create_project(
self.project_name, created_on=str(timezone.now()),
parent=self.parent_id, domain=self.domain_id)
except Exception as e:
self.add_note(
"Error: '%s' while creating project: %s" %
(e, self.project_name))
raise
# put project_id into action cache:
self.action.task.cache['project_id'] = project.id
self.set_cache('project_id', project.id)
self.add_note("New project '%s' created." % project.name)
def _grant_roles(self, user, roles, project_id):
ks_roles = []
for role in roles:
ks_role = self.id_manager.find_role(role)
if ks_role:
ks_roles.append(ks_role)
else:
raise TypeError("Keystone missing role: %s" % role)
for role in ks_roles:
self.id_manager.add_user_role(user, role, project_id)
# TODO(adriant): Write tests for this action.
class NewProjectAction(BaseAction, ProjectCreateBase):
class NewProjectAction(BaseAction, ProjectMixin, UserMixin):
"""
Creates a new project for the current keystone_user.
@ -468,23 +493,22 @@ class NewProjectAction(BaseAction, ProjectCreateBase):
def __init__(self, *args, **kwargs):
super(NewProjectAction, self).__init__(*args, **kwargs)
self.id_manager = user_store.IdentityManager()
def _validate(self):
self.action.valid = (
self._validate_domain() and
self._validate_domain_id() and
self._validate_parent_project() and
self._validate_project())
self._validate_project_absent())
self.action.save()
def _validate_domain(self):
def _validate_domain_id(self):
keystone_user = self.action.task.keystone_user
if keystone_user['project_domain_id'] != self.domain_id:
self.add_note('Domain id does not match keystone user domain.')
return False
return super(NewProjectAction, self)._validate_domain()
return super(NewProjectAction, self)._validate_domain_id()
def _validate_parent_project(self):
if self.parent_id:
@ -525,7 +549,8 @@ class NewProjectAction(BaseAction, ProjectCreateBase):
keystone_user = self.action.task.keystone_user
try:
user = self.id_manager.get_user(keystone_user['user_id'])
id_manager = user_store.IdentityManager()
user = id_manager.get_user(keystone_user['user_id'])
self._grant_roles(user, default_roles, project_id)
except Exception as e:
@ -550,7 +575,7 @@ class NewProjectAction(BaseAction, ProjectCreateBase):
pass
class NewProjectWithUserAction(UserNameAction, ProjectCreateBase):
class NewProjectWithUserAction(UserNameAction, ProjectMixin, UserMixin):
"""
Makes a new project for the given username. Will create the user if it
doesn't exists.
@ -566,18 +591,18 @@ class NewProjectWithUserAction(UserNameAction, ProjectCreateBase):
def __init__(self, *args, **kwargs):
super(NewProjectWithUserAction, self).__init__(*args, **kwargs)
self.id_manager = user_store.IdentityManager()
def _validate(self):
self.action.valid = (
self._validate_domain() and
self._validate_domain_id() and
self._validate_parent_project() and
self._validate_project() and
self._validate_project_absent() and
self._validate_user())
self.action.save()
def _validate_user(self):
user = self.id_manager.find_user(self.username, self.domain_id)
id_manager = user_store.IdentityManager()
user = id_manager.find_user(self.username, self.domain_id)
if user:
if user.email == self.email:
@ -630,9 +655,9 @@ class NewProjectWithUserAction(UserNameAction, ProjectCreateBase):
self.add_note("Project already created.")
else:
self.action.valid = (
self._validate_domain() and
self._validate_domain_id() and
self._validate_parent_project() and
self._validate_project())
self._validate_project_absent())
self.action.save()
if not self.valid:
@ -640,6 +665,7 @@ class NewProjectWithUserAction(UserNameAction, ProjectCreateBase):
self._create_project()
# User validation and checks
user_id = self.get_cache('user_id')
if user_id:
self.action.task.cache['user_id'] = user_id
@ -651,53 +677,57 @@ class NewProjectWithUserAction(UserNameAction, ProjectCreateBase):
if not self.valid:
return
default_roles = settings.ACTION_SETTINGS.get(
'NewProjectAction', {}).get("default_roles", {})
self._create_user_for_project()
project_id = self.get_cache('project_id')
def _create_user_for_project(self):
id_manager = user_store.IdentityManager()
default_roles = settings.ACTION_SETTINGS.get(
'NewProjectAction', {}).get("default_roles", {})
if self.action.state == "default":
try:
# Generate a temporary password:
password = uuid4().hex + uuid4().hex
project_id = self.get_cache('project_id')
user = self.id_manager.create_user(
name=self.username, password=password,
email=self.email, domain=self.domain_id,
created_on=str(timezone.now()))
if self.action.state == "default":
try:
# Generate a temporary password:
password = uuid4().hex + uuid4().hex
self._grant_roles(user, default_roles, project_id)
except Exception as e:
self.add_note(
"Error: '%s' while creating user: %s with roles: %s" %
(e, self.username, default_roles))
raise
user = id_manager.create_user(
name=self.username, password=password,
email=self.email, domain=self.domain_id,
created_on=str(timezone.now()))
# put user_id into action cache:
self.action.task.cache['user_id'] = user.id
self.set_cache('user_id', user.id)
self._grant_roles(user, default_roles, project_id)
except Exception as e:
self.add_note(
"New user '%s' created for project %s with roles: %s" %
(self.username, project_id, default_roles))
elif self.action.state == "existing":
try:
user = self.id_manager.find_user(
self.username, self.domain_id)
"Error: '%s' while creating user: %s with roles: %s" %
(e, self.username, default_roles))
raise
self._grant_roles(user, default_roles, project_id)
except Exception as e:
self.add_note(
"Error: '%s' while attaching user: %s with roles: %s" %
(e, self.username, default_roles))
raise
# put user_id into action cache:
self.action.task.cache['user_id'] = user.id
self.set_cache('user_id', user.id)
self.add_note(
"New user '%s' created for project %s with roles: %s" %
(self.username, project_id, default_roles))
elif self.action.state == "existing":
try:
user = id_manager.find_user(
self.username, self.domain_id)
# put user_id into action cache:
self.action.task.cache['user_id'] = user.id
self.set_cache('user_id', user.id)
self.add_note(("Existing user '%s' attached to project %s" +
" with roles: %s")
% (self.username, project_id,
default_roles))
self._grant_roles(user, default_roles, project_id)
except Exception as e:
self.add_note(
"Error: '%s' while attaching user: %s with roles: %s" %
(e, self.username, default_roles))
raise
# put user_id into action cache:
self.action.task.cache['user_id'] = user.id
self.set_cache('user_id', user.id)
self.add_note(("Existing user '%s' attached to project %s" +
" with roles: %s")
% (self.username, project_id,
default_roles))
def _submit(self, token_data):
"""
@ -716,11 +746,12 @@ class NewProjectWithUserAction(UserNameAction, ProjectCreateBase):
self.action.task.cache['project_id'] = project_id
user_id = self.get_cache('user_id')
self.action.task.cache['user_id'] = user_id
id_manager = user_store.IdentityManager()
if self.action.state == "default":
user = self.id_manager.get_user(user_id)
user = id_manager.get_user(user_id)
try:
self.id_manager.update_user_password(
id_manager.update_user_password(
user, token_data['password'])
except Exception as e:
self.add_note(
@ -736,7 +767,8 @@ class NewProjectWithUserAction(UserNameAction, ProjectCreateBase):
user_id, project_id))
class ResetUserAction(UserNameAction):
# TODO: rename to ResetUserPasswordAction
class ResetUserAction(UserNameAction, UserMixin):
"""
Simple action to reset a password for a given user.
"""
@ -753,21 +785,10 @@ class ResetUserAction(UserNameAction):
blacklist = settings.ACTION_SETTINGS.get(
'ResetUserAction', {}).get("blacklisted_roles", {})
def _validate(self):
def _validate_user_roles(self):
id_manager = user_store.IdentityManager()
self.domain = id_manager.find_domain(self.domain_name)
if not self.domain:
self.add_note('Domain does not exist.')
return False
self.user = id_manager.find_user(self.username, self.domain.id)
if not self.user:
valid = False
self.add_note('No user present with username')
return valid
roles = id_manager.get_all_roles(self.user)
user_roles = []
@ -775,36 +796,41 @@ class ResetUserAction(UserNameAction):
user_roles.extend(role.name for role in roles)
if set(self.blacklist) & set(user_roles):
valid = False
self.add_note('Cannot reset users with blacklisted roles.')
elif self.user.email == self.email:
valid = True
return False
if self.user.email == self.email:
self.action.need_token = True
self.set_token_fields(["password"])
self.add_note('Existing user with matching email.')
return True
else:
valid = False
self.add_note('Existing user with non-matching email.')
return False
return valid
def _validate(self):
# Here, the order of validation matters
# as each one adds new class variables
self.action.valid = (
self._validate_domain_name() and
self._validate_username_exists() and
self._validate_user_roles()
)
self.action.save()
def _pre_approve(self):
self.action.valid = self._validate()
self.action.save()
self._validate()
def _post_approve(self):
self.action.valid = self._validate()
self.action.save()
self._validate()
def _submit(self, token_data):
self.action.valid = self._validate()
self.action.save()
self._validate()
if not self.valid:
return
id_manager = user_store.IdentityManager()
try:
id_manager.update_user_password(self.user, token_data['password'])
except Exception as e:
@ -815,7 +841,7 @@ class ResetUserAction(UserNameAction):
self.add_note('User %s password has been changed.' % self.username)
class EditUserRolesAction(UserIdAction):
class EditUserRolesAction(UserIdAction, ProjectMixin, UserMixin):
"""
A class for adding or removing roles
on a user for the given project.
@ -829,89 +855,64 @@ class EditUserRolesAction(UserIdAction):
'remove'
]
def _validate(self):
id_manager = user_store.IdentityManager()
keystone_user = self.action.task.keystone_user
if keystone_user['project_domain_id'] != self.domain_id:
self.add_note('Domain id does not match keystone user domain.')
self.action.valid = False
return
domain = id_manager.get_domain(self.domain_id)
if not domain:
self.add_note('Domain does not exist.')
self.action.valid = False
return
# TODO: This is potentially too limiting and could be changed into
# an auth check. Perhaps just allowing 'admin' role users is enough.
if not keystone_user['project_id'] == self.project_id:
self.add_note('Project id does not match keystone user project.')
self.action.valid = False
return
# Role permissions check
if not self.are_roles_managable(user_roles=keystone_user['roles'],
requested_roles=self.roles):
self.add_note('User does not have permission to edit role(s).')
self.action.valid = False
return
project = id_manager.get_project(self.project_id)
if not project:
self.add_note('Project does not exist.')
self.action.valid = False
return
def _validate_target_user(self):
# Get target user
user = self._get_target_user()
if not user:
self.add_note('No user present with user_id')
self.action.valid = False
return
return False
return True
def _validate_user_roles(self):
id_manager = user_store.IdentityManager()
user = self._get_target_user()
project = id_manager.get_project(self.project_id)
# user roles
current_roles = id_manager.get_roles(user, project)
current_role_names = {role.name for role in current_roles}
if self.remove:
remaining = set(current_role_names) & set(self.roles)
if not remaining:
self.action.valid = True
self.action.state = "complete"
self.add_note(
"User doesn't have roles."
)
"User doesn't have roles to remove.")
else:
self.roles = list(remaining)
self.action.valid = True
self.add_note(
'User has roles to remove.')
else:
missing = set(self.roles) - set(current_role_names)
if not missing:
self.action.valid = True
self.action.state = "complete"
self.add_note(
'User already has roles.'
)
'User already has roles.')
else:
self.roles = list(missing)
self.action.valid = True
self.add_note(
'User user missing roles.')
# All paths are valid here
# We've just set state and roles that need to be changed.
return True
def _validate(self):
self.action.valid = (
self._validate_keystone_user() and
self._validate_role_permissions() and
self._validate_domain_id() and
self._validate_project_id() and
self._validate_target_user() and
self._validate_user_roles()
)
self.action.save()
def _pre_approve(self):
self._validate()
self.action.save()
def _post_approve(self):
self._validate()
self.action.save()
def _submit(self, token_data):
self._validate()
self.action.save()
if not self.valid:
return

View File

@ -12,15 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.
from stacktask.actions.models import BaseAction
from stacktask.actions.models import BaseAction, ProjectMixin, UserMixin
from stacktask.actions.tenant_setup import serializers
from django.conf import settings
from stacktask.actions.user_store import IdentityManager
from stacktask.actions import openstack_clients
from stacktask.actions import openstack_clients, user_store
import six
class NewDefaultNetworkAction(BaseAction):
class NewDefaultNetworkAction(BaseAction, ProjectMixin):
"""
This action will setup all required basic networking
resources so that a new user can launch instances
@ -33,56 +32,59 @@ class NewDefaultNetworkAction(BaseAction):
'region',
]
def _validate(self):
# Default state is invalid
self.action.valid = False
if not self.project_id:
self.add_note('No project_id given.')
return
def __init__(self, *args, **kwargs):
super(NewDefaultNetworkAction, self).__init__(*args, **kwargs)
def _validate_region(self):
if not self.region:
self.add_note('No region given.')
return
self.add_note('ERROR: No region given.')
return False
id_manager = user_store.IdentityManager()
region = id_manager.find_region(self.region)
if not region:
self.add_note('ERROR: Region does not exist.')
return False
self.add_note('Region: %s exists.' % self.region)
return True
def _validate_defaults(self):
defaults = settings.ACTION_SETTINGS.get(
'NewDefaultNetworkAction', {}).get(self.region, {})
if not defaults:
self.add_note('ERROR: No default settings for region %s.' %
self.region)
return False
return True
def _validate_keystone_user(self):
keystone_user = self.action.task.keystone_user
if keystone_user.get('project_id') != self.project_id:
self.add_note('Project id does not match keystone user project.')
return
return False
return True
id_manager = IdentityManager()
project = id_manager.get_project(self.project_id)
if not project:
self.add_note('Project does not exist.')
return
self.add_note('Project_id: %s exists.' % project.id)
region = id_manager.find_region(self.region)
if not region:
self.add_note('Region does not exist.')
return
self.add_note('Region: %s exists.' % self.region)
self.defaults = settings.ACTION_SETTINGS.get(
'NewDefaultNetworkAction', {}).get(self.region, {})
if not self.defaults:
self.add_note('ERROR: No default settings for given region.')
return
self.action.valid = True
def _validate(self):
self.action.valid = (
self._validate_region() and
self._validate_project_id() and
self._validate_defaults() and
self._validate_keystone_user()
)
self.action.save()
def _create_network(self):
neutron = openstack_clients.get_neutronclient(region=self.region)
defaults = settings.ACTION_SETTINGS.get(
'NewDefaultNetworkAction', {}).get(self.region, {})
if not self.get_cache('network_id'):
try:
network_body = {
"network": {
"name": self.defaults['network_name'],
"name": defaults['network_name'],
'tenant_id': self.project_id,
"admin_state_up": True
}
@ -91,15 +93,15 @@ class NewDefaultNetworkAction(BaseAction):
except Exception as e:
self.add_note(
"Error: '%s' while creating network: %s" %
(e, self.defaults['network_name']))
(e, defaults['network_name']))
raise
self.set_cache('network_id', network['network']['id'])
self.add_note("Network %s created for project %s" %
(self.defaults['network_name'],
(defaults['network_name'],
self.project_id))
else:
self.add_note("Network %s already created for project %s" %
(self.defaults['network_name'],
(defaults['network_name'],
self.project_id))
if not self.get_cache('subnet_id'):
@ -109,8 +111,8 @@ class NewDefaultNetworkAction(BaseAction):
"network_id": self.get_cache('network_id'),
"ip_version": 4,
'tenant_id': self.project_id,
'dns_nameservers': self.defaults['DNS_NAMESERVERS'],
"cidr": self.defaults['SUBNET_CIDR']
'dns_nameservers': defaults['DNS_NAMESERVERS'],
"cidr": defaults['SUBNET_CIDR']
}
}
subnet = neutron.create_subnet(body=subnet_body)
@ -120,18 +122,18 @@ class NewDefaultNetworkAction(BaseAction):
raise
self.set_cache('subnet_id', subnet['subnet']['id'])
self.add_note("Subnet created for network %s" %
self.defaults['network_name'])
defaults['network_name'])
else:
self.add_note("Subnet already created for network %s" %
self.defaults['network_name'])
defaults['network_name'])
if not self.get_cache('router_id'):
try:
router_body = {
"router": {
"name": self.defaults['router_name'],
"name": defaults['router_name'],
"external_gateway_info": {
"network_id": self.defaults['public_network']
"network_id": defaults['public_network']
},
'tenant_id': self.project_id,
"admin_state_up": True
@ -141,7 +143,7 @@ class NewDefaultNetworkAction(BaseAction):
except Exception as e:
self.add_note(
"Error: '%s' while creating router: %s" %
(e, self.defaults['router_name']))
(e, defaults['router_name']))
raise
self.set_cache('router_id', router['router']['id'])
self.add_note("Router created for project %s" %
@ -163,12 +165,12 @@ class NewDefaultNetworkAction(BaseAction):
self.add_note("Interface added to router for subnet")
def _pre_approve(self):
# Note: Do we need to get this from cache? it is a required setting
# self.project_id = self.action.task.cache.get('project_id', None)
self._validate()
self.action.save()
def _post_approve(self):
self._validate()
self.action.save()
if self.setup_network and self.valid:
self._create_network()
@ -189,84 +191,33 @@ class NewProjectDefaultNetworkAction(NewDefaultNetworkAction):
]
def _pre_validate(self):
# Default state is invalid
self.action.valid = False
# We don't check project here as it doesn't exist yet.
if not self.region:
self.add_note('No region given.')
return
id_manager = IdentityManager()
region = id_manager.find_region(self.region)
if not region:
self.add_note('Region does not exist.')
return
self.add_note('Region: %s exists.' % self.region)
self.defaults = settings.ACTION_SETTINGS.get(
'NewDefaultNetworkAction', {}).get(self.region, {})
if not self.defaults:
self.add_note('ERROR: No default settings for given region.')
return
self.action.valid = True
# Note: Don't check project here as it doesn't exist yet.
self.action.valid = (
self._validate_region() and
self._validate_defaults()
)
self.action.save()
def _validate(self):
# Default state is invalid
self.action.valid = False
self.project_id = self.action.task.cache.get('project_id', None)
if not self.project_id:
self.add_note('No project_id given.')
return
if not self.region:
self.add_note('No region given.')
return
id_manager = IdentityManager()
project = id_manager.get_project(self.project_id)
if not project:
self.add_note('Project does not exist.')
return
self.add_note('Project_id: %s exists.' % project.id)
region = id_manager.find_region(self.region)
if not region:
self.add_note('Region does not exist.')
return
self.add_note('Region: %s exists.' % self.region)
self.defaults = settings.ACTION_SETTINGS.get(
'NewDefaultNetworkAction', {}).get(self.region, {})
if not self.defaults:
self.add_note('ERROR: No default settings for given region.')
return
self.action.valid = True
self.action.valid = (
self._validate_region() and
self._validate_project_id() and
self._validate_defaults()
)
self.action.save()
def _pre_approve(self):
self._pre_validate()
self.action.save()
def _post_approve(self):
self.project_id = self.action.task.cache.get('project_id', None)
self._validate()
self.action.save()
if self.setup_network and self.valid:
self._create_network()
class AddDefaultUsersToProjectAction(BaseAction):
class AddDefaultUsersToProjectAction(BaseAction, ProjectMixin, UserMixin):
"""
The purpose of this action is to add a given set of users after
the creation of a new Project. This is mainly for administrative
@ -278,14 +229,15 @@ class AddDefaultUsersToProjectAction(BaseAction):
'domain_id',
]
def _validate_users(self):
def __init__(self, *args, **kwargs):
self.users = settings.ACTION_SETTINGS.get(
'AddDefaultUsersToProjectAction', {}).get('default_users', [])
self.roles = settings.ACTION_SETTINGS.get(
'AddDefaultUsersToProjectAction', {}).get('default_roles', [])
super(AddDefaultUsersToProjectAction, self).__init__(*args, **kwargs)
id_manager = IdentityManager()
def _validate_users(self):
id_manager = user_store.IdentityManager()
all_found = True
for user in self.users:
ks_user = id_manager.find_user(user, self.domain_id)
@ -295,59 +247,40 @@ class AddDefaultUsersToProjectAction(BaseAction):
self.add_note('ERROR: User: %s does not exist.' % user)
all_found = False
for role in self.roles:
ks_role = id_manager.find_role(role)
if ks_role:
self.add_note('Role: %s exists.' % role)
else:
self.add_note('ERROR: Role: %s does not exist.' % role)
all_found = False
return all_found
if all_found:
return True
else:
return False
def _validate_project(self):
self.project_id = self.action.task.cache.get('project_id', None)
id_manager = IdentityManager()
project = id_manager.get_project(self.project_id)
if not project:
self.add_note('Project does not exist.')
return False
self.add_note('Project_id: %s exists.' % project.id)
return True
def _validate(self):
self.action.valid = self._validate_users() and self._validate_project()
self.action.save()
def _pre_approve(self):
def _pre_validate(self):
self.action.valid = self._validate_users()
self.action.save()
def _validate(self):
self.action.valid = (
self._validate_users() and
self._validate_project_id()
)
self.action.save()
def _pre_approve(self):
self._pre_validate()
def _post_approve(self):
id_manager = user_store.IdentityManager()
self.project_id = self.action.task.cache.get('project_id', None)
self._validate()
if self.valid and not self.action.state == "completed":
id_manager = IdentityManager()
project = id_manager.get_project(self.project_id)
try:
for user in self.users:
ks_user = id_manager.find_user(user, self.domain_id)
for role in self.roles:
ks_role = id_manager.find_role(name=role)
id_manager.add_user_role(ks_user, ks_role, project.id)
self.add_note(
'User: "%s" given role: %s on project: %s.' %
(ks_user.name, ks_role.name, project.id))
self._grant_roles(ks_user, self.roles, self.project_id)
self.add_note(
'User: "%s" given roles: %s on project: %s.' %
(ks_user.name, self.roles, self.project_id))
except Exception as e:
self.add_note(
"Error: '%s' while adding users to project: %s" %
(e, project.id))
(e, self.project_id))
raise
self.action.state = "completed"
self.action.save()
@ -401,7 +334,7 @@ class SetProjectQuotaAction(BaseAction):
'set it.')
return False
id_manager = IdentityManager()
id_manager = user_store.IdentityManager()
project = id_manager.get_project(self.project_id)
if not project:
self.add_note('Project with id %s does not exist.' %

View File

@ -123,8 +123,9 @@ def get_fake_cinderclient(region):
class ProjectSetupActionTests(TestCase):
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.' +
'openstack_clients.get_neutronclient',
@ -175,8 +176,9 @@ class ProjectSetupActionTests(TestCase):
self.assertEquals(len(neutron_cache['routers']), 1)
self.assertEquals(len(neutron_cache['subnets']), 1)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.' +
'openstack_clients.get_neutronclient',
@ -222,8 +224,9 @@ class ProjectSetupActionTests(TestCase):
self.assertEquals(len(neutron_cache['routers']), 0)
self.assertEquals(len(neutron_cache['subnets']), 0)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.' +
'openstack_clients.get_neutronclient',
@ -293,8 +296,9 @@ class ProjectSetupActionTests(TestCase):
self.assertEquals(len(neutron_cache['routers']), 1)
self.assertEquals(len(neutron_cache['subnets']), 1)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.' +
'openstack_clients.get_neutronclient',
@ -345,8 +349,9 @@ class ProjectSetupActionTests(TestCase):
self.assertEquals(len(neutron_cache['routers']), 1)
self.assertEquals(len(neutron_cache['subnets']), 1)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.' +
'openstack_clients.get_neutronclient',
@ -380,8 +385,9 @@ class ProjectSetupActionTests(TestCase):
self.assertEquals(len(neutron_cache['routers']), 0)
self.assertEquals(len(neutron_cache['subnets']), 0)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.' +
'openstack_clients.get_neutronclient',
@ -427,8 +433,9 @@ class ProjectSetupActionTests(TestCase):
self.assertEquals(len(neutron_cache['routers']), 0)
self.assertEquals(len(neutron_cache['subnets']), 0)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.' +
'openstack_clients.get_neutronclient',
@ -498,8 +505,9 @@ class ProjectSetupActionTests(TestCase):
self.assertEquals(len(neutron_cache['routers']), 1)
self.assertEquals(len(neutron_cache['subnets']), 1)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_add_default_users(self):
"""
Base case, adds admin user with admin role to project.
@ -534,11 +542,45 @@ class ProjectSetupActionTests(TestCase):
project = tests.temp_cache['projects']['test_project']
self.assertEquals(project.roles['user_id_0'], ['admin'])
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
def test_add_admin_reapprove(self):
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_add_default_users_invalid_project(self):
"""Add default users to a project that doesn't exist.
Action should become invalid at the post_approve state, it's ok if
the project isn't created yet during pre_approve.
"""
Ensure nothing happens or changes if rerun of approve.
project = mock.Mock()
project.id = 'test_project_id'
project.name = 'test_project'
project.domain = 'default'
project.roles = {}
setup_temp_cache({'test_project': project}, {})
task = Task.objects.create(
ip_address="0.0.0.0", keystone_user={'roles': ['admin']})
task.cache = {'project_id': "invalid_project_id"}
action = AddDefaultUsersToProjectAction(
{'domain_id': 'default'}, task=task, order=1)
action.pre_approve()
# No need to test project yet - it's ok if it doesn't exist
self.assertEquals(action.valid, True)
action.post_approve()
# Now the missing project should make the action invalid
self.assertEquals(action.valid, False)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_add_default_users_reapprove(self):
"""
Ensure nothing happens or changes during rerun of approve.
"""
project = mock.Mock()
project.id = 'test_project_id'
@ -571,8 +613,9 @@ class ProjectSetupActionTests(TestCase):
project = tests.temp_cache['projects']['test_project']
self.assertEquals(project.roles['user_id_0'], ['admin'])
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.' +
'openstack_clients.get_neutronclient',

View File

@ -217,6 +217,135 @@ class ActionTests(TestCase):
action.submit(token_data)
self.assertEquals(action.valid, False)
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
def test_new_user_wrong_project(self):
"""
Existing user, valid project, project does not match keystone user.
Action should be invalid.
"""
user = mock.Mock()
user.id = 'user_id'
user.name = "test@example.com"
user.email = "test@example.com"
user.domain = 'default'
project = mock.Mock()
project.id = 'test_project_id_1'
project.name = 'test_project'
project.domain = 'default'
project.roles = {user.id: ['_member_']}
setup_temp_cache({'test_project': project}, {user.id: user})
task = Task.objects.create(
ip_address="0.0.0.0",
keystone_user={
'roles': ['project_mod'],
'project_id': 'test_project_id',
'project_domain_id': 'default',
})
data = {
'email': 'test@example.com',
'project_id': 'test_project_id_1',
'roles': ['_member_'],
'domain_id': 'default',
}
action = NewUserAction(data, task=task, order=1)
action.pre_approve()
self.assertEquals(action.valid, False)
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
def test_new_user_only_member(self):
"""
Existing user, valid project, no edit permissions.
Action should be invalid.
"""
user = mock.Mock()
user.id = 'user_id'
user.name = "test@example.com"
user.email = "test@example.com"
user.domain = 'default'
project = mock.Mock()
project.id = 'test_project_id'
project.name = 'test_project'
project.domain = 'default'
project.roles = {user.id: ['_member_']}
setup_temp_cache({'test_project': project}, {user.id: user})
task = Task.objects.create(
ip_address="0.0.0.0",
keystone_user={
'roles': ['_member_'],
'project_id': 'test_project_id',
'project_domain_id': 'default',
})
data = {
'email': 'test@example.com',
'project_id': 'test_project_id',
'roles': ['_member_'],
'domain_id': 'default',
}
action = NewUserAction(data, task=task, order=1)
action.pre_approve()
self.assertFalse(action.valid)
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
def test_new_user_wrong_domain(self):
"""
Existing user, valid project, invalid domain.
Action should be invalid.
"""
user = mock.Mock()
user.id = 'user_id'
user.name = "test@example.com"
user.email = "test@example.com"
user.domain = 'default'
project = mock.Mock()
project.id = 'test_project_id'
project.name = 'test_project'
project.domain = 'default'
project.roles = {user.id: ['_member_']}
setup_temp_cache({'test_project': project}, {user.id: user})
task = Task.objects.create(
ip_address="0.0.0.0",
keystone_user={
'roles': ['_member_'],
'project_id': 'test_project_id',
'project_domain_id': 'default',
})
data = {
'email': 'test@example.com',
'project_id': 'test_project_id',
'roles': ['_member_'],
'domain_id': 'not_default',
}
action = NewUserAction(data, task=task, order=1)
action.pre_approve()
self.assertFalse(action.valid)
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
def test_new_project(self):
@ -335,7 +464,7 @@ class ActionTests(TestCase):
FakeManager)
def test_new_project_existing_user(self):
"""
no project, existing user.
Create a project for a user that already exists.
"""
user = mock.Mock()
@ -390,9 +519,9 @@ class ActionTests(TestCase):
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
def test_new_project_existing(self):
def test_new_project_existing_project(self):
"""
Existing project.
Create a project that already exists.
"""
project = mock.Mock()
@ -428,7 +557,37 @@ class ActionTests(TestCase):
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
def test_reset_user(self):
def test_new_project_invalid_domain_id(self):
""" Create a project using an invalid domain """
setup_temp_cache({}, {})
task = Task.objects.create(
ip_address="0.0.0.0",
keystone_user={
'roles': ['admin', 'project_mod'],
'project_id': 'test_project_id',
'project_domain_id': 'default',
})
data = {
'domain_id': 'not_default_id',
'parent_id': None,
'email': 'test@example.com',
'project_name': 'test_project',
}
action = NewProjectWithUserAction(data, task=task, order=1)
action.pre_approve()
self.assertEquals(action.valid, False)
action.post_approve()
self.assertEquals(action.valid, False)
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
def test_reset_user_password(self):
"""
Base case, existing user.
"""
@ -474,9 +633,9 @@ class ActionTests(TestCase):
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
def test_reset_user_no_user(self):
def test_reset_user_password_no_user(self):
"""
No user.
Reset password for a non-existant user.
"""
setup_temp_cache({}, {})
@ -509,7 +668,7 @@ class ActionTests(TestCase):
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
def test_edit_user_add(self):
def test_edit_user_roles_add(self):
"""
Add roles to existing user.
"""
@ -561,7 +720,7 @@ class ActionTests(TestCase):
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
def test_edit_user_add_complete(self):
def test_edit_user_roles_add_complete(self):
"""
Add roles to existing user.
"""
@ -614,7 +773,7 @@ class ActionTests(TestCase):
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
def test_edit_user_remove(self):
def test_edit_user_roles_remove(self):
"""
Remove roles from existing user.
"""
@ -665,9 +824,9 @@ class ActionTests(TestCase):
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
def test_edit_user_remove_complete(self):
def test_edit_user_roles_remove_complete(self):
"""
Remove roles from existing user.
Remove roles from user that does not have them.
"""
user = mock.Mock()

View File

@ -46,9 +46,9 @@ class IdentityManager(object):
def __init__(self):
self.ks_client = get_keystoneclient()
def find_user(self, name, domain_id):
def find_user(self, name, domain):
try:
users = self.ks_client.users.list(name=name, domain=domain_id)
users = self.ks_client.users.list(name=name, domain=domain)
if users:
# NOTE(adriant) usernames are unique in a domain
return users[0]
@ -134,22 +134,22 @@ class IdentityManager(object):
return projects
def add_user_role(self, user, role, project_id):
def add_user_role(self, user, role, project):
try:
self.ks_client.roles.grant(role, user=user, project=project_id)
self.ks_client.roles.grant(role, user=user, project=project)
except ks_exceptions.Conflict:
# Conflict is ok, it means the user already has this role.
pass
def remove_user_role(self, user, role, project_id):
self.ks_client.roles.revoke(role, user=user, project=project_id)
def remove_user_role(self, user, role, project):
self.ks_client.roles.revoke(role, user=user, project=project)
def find_project(self, project_name, domain_id):
def find_project(self, project_name, domain):
try:
# Using a filtered list as find is more efficient than
# using the client find
projects = self.ks_client.projects.list(
name=project_name, domain=domain_id)
name=project_name, domain=domain)
if projects:
# NOTE(adriant) project names are unique in a domain so
# it is safe to assume filtering on project name and domain

View File

@ -1,17 +0,0 @@
# Copyright (C) 2015 Catalyst IT Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from stacktask.api.v1.tests import *
# Just pipes the tests to the tests folder to keep things clean.

View File

@ -59,32 +59,37 @@ def setup_temp_cache(projects, users):
}
class FakeProject():
def __init__(self, project):
self.id = project.id
self.name = project.name
self.roles = project.roles
def list_users(self):
global temp_cache
usernames = []
for username, roles in self.roles.iteritems():
if roles:
usernames.append(username)
users = []
for user in temp_cache['users'].values():
if user.name in usernames:
users.append(user)
return users
class FakeManager(object):
def find_user(self, name, domain_id):
def _project_from_id(self, project):
if isinstance(project, mock.Mock):
return project
else:
return self.get_project(project)
def _role_from_id(self, role):
if isinstance(role, mock.Mock):
return role
else:
return self.get_role(role)
def _user_from_id(self, user):
if isinstance(user, mock.Mock):
return user
else:
return self.get_user(user)
def _domain_from_id(self, domain):
if isinstance(domain, mock.Mock):
return domain
else:
return self.get_domain(domain)
def find_user(self, name, domain):
domain = self._domain_from_id(domain)
global temp_cache
for user in temp_cache['users'].values():
if user.name == name and user.domain == domain_id:
if user.name == name and user.domain == domain.id:
return user
return None
@ -93,6 +98,7 @@ class FakeManager(object):
return temp_cache['users'].get(user_id, None)
def list_users(self, project):
project = self._project_from_id(project)
global temp_cache
roles = temp_cache['projects'][project.name].roles
users = []
@ -109,6 +115,8 @@ class FakeManager(object):
def create_user(self, name, password, email, created_on,
domain='default', default_project=None):
domain = self._project_from_id(domain)
default_project = self._project_from_id(default_project)
global temp_cache
user = mock.Mock()
user.id = "user_id_%s" % int(temp_cache['i'])
@ -123,8 +131,7 @@ class FakeManager(object):
return user
def update_user_password(self, user, password):
global temp_cache
user = temp_cache['users'][user.id]
user = self._user_from_id(user)
user.password = password
def find_role(self, name):
@ -136,7 +143,8 @@ class FakeManager(object):
return None
def get_roles(self, user, project):
global temp_cache
user = self._user_from_id(user)
project = self._project_from_id(project)
try:
roles = []
for role in project.roles[user.id]:
@ -148,6 +156,7 @@ class FakeManager(object):
return []
def get_all_roles(self, user):
user = self._user_from_id(user)
global temp_cache
projects = {}
for project in temp_cache['projects'].values():
@ -159,24 +168,29 @@ class FakeManager(object):
return projects
def add_user_role(self, user, role, project_id):
project = self.get_project(project_id)
def add_user_role(self, user, role, project):
user = self._user_from_id(user)
role = self._role_from_id(role)
project = self._project_from_id(project)
try:
project.roles[user.id].append(role.name)
except KeyError:
project.roles[user.id] = [role.name]
def remove_user_role(self, user, role, project_id):
project = self.get_project(project_id)
def remove_user_role(self, user, role, project):
user = self._user_from_id(user)
role = self._role_from_id(role)
project = self._project_from_id(project)
try:
project.roles[user.id].remove(role.name)
except KeyError:
pass
def find_project(self, project_name, domain_id):
def find_project(self, project_name, domain):
domain = self._domain_from_id(domain)
global temp_cache
for project in temp_cache['projects'].values():
if project.name == project_name and project.domain == domain_id:
if project.name == project_name and project.domain == domain.id:
return project
return None
@ -184,10 +198,11 @@ class FakeManager(object):
global temp_cache
for project in temp_cache['projects'].values():
if project.id == project_id:
return FakeProject(project)
return project
def create_project(self, project_name, created_on, parent=None,
domain='default', p_id=None):
parent = self._project_from_id(parent)
global temp_cache
project = mock.Mock()
if p_id:

View File

@ -59,8 +59,9 @@ class AdminAPITests(APITestCase):
@mock.patch(
'stacktask.actions.models.user_store.IdentityManager', FakeManager)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_task_get(self):
"""
Test the basic task detail view.
@ -194,8 +195,9 @@ class AdminAPITests(APITestCase):
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_task_complete(self):
"""
Can't approve a completed task.
@ -228,8 +230,9 @@ class AdminAPITests(APITestCase):
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_task_update(self):
"""
Creates a invalid task.
@ -286,8 +289,9 @@ class AdminAPITests(APITestCase):
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_notification_acknowledge(self):
"""
Test that you can acknowledge a notification.
@ -337,8 +341,9 @@ class AdminAPITests(APITestCase):
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_notification_acknowledge_list(self):
"""
Test that you can acknowledge a list of notifications.
@ -550,8 +555,9 @@ class AdminAPITests(APITestCase):
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_cancel_task(self):
"""
Ensure the ability to cancel a task.
@ -588,8 +594,9 @@ class AdminAPITests(APITestCase):
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_cancel_task_sent_token(self):
"""
Ensure the ability to cancel a task after the token is sent.
@ -628,8 +635,9 @@ class AdminAPITests(APITestCase):
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_task_update_unapprove(self):
"""
Ensure task update doesn't work for approved actions.
@ -673,7 +681,8 @@ class AdminAPITests(APITestCase):
@mock.patch(
'stacktask.actions.models.user_store.IdentityManager', FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.IdentityManager', FakeManager)
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_cancel_task_own(self):
"""
Ensure the ability to cancel your own task.
@ -720,7 +729,8 @@ class AdminAPITests(APITestCase):
@mock.patch(
'stacktask.actions.models.user_store.IdentityManager', FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.IdentityManager', FakeManager)
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_cancel_task_own_fail(self):
"""
Ensure the ability to cancel ONLY your own task.
@ -862,8 +872,9 @@ class AdminAPITests(APITestCase):
@mock.patch(
'stacktask.actions.models.user_store.IdentityManager', FakeManager)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_task_list_filter(self):
"""
"""
@ -1085,7 +1096,8 @@ class AdminAPITests(APITestCase):
@mock.patch(
'stacktask.actions.models.user_store.IdentityManager', FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.IdentityManager', FakeManager)
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_reset_admin(self):
"""
Ensure that you cannot issue a password reset for an

View File

@ -248,8 +248,9 @@ class TaskViewTests(APITestCase):
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_new_project(self):
"""
Ensure the new project workflow goes as expected.
@ -286,10 +287,12 @@ class TaskViewTests(APITestCase):
response = self.client.post(url, data, format='json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.models.user_store.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_new_project_existing(self):
"""
Test to ensure validation marks actions as invalid
@ -327,10 +330,12 @@ class TaskViewTests(APITestCase):
{'errors': ['Cannot approve an invalid task. ' +
'Update data and rerun pre_approve.']})
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.models.user_store.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_new_project_existing_user(self):
"""
Project created if not present, existing user attached.
@ -373,10 +378,12 @@ class TaskViewTests(APITestCase):
{'notes': ['Task completed successfully.']}
)
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.models.user_store.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_new_project_existing_project_new_user(self):
"""
Project already exists but new user attempting to create it.
@ -525,8 +532,9 @@ class TaskViewTests(APITestCase):
@mock.patch('stacktask.actions.models.user_store.IdentityManager',
FakeManager)
@mock.patch('stacktask.actions.tenant_setup.models.IdentityManager',
FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_notification_createproject(self):
"""
CreateProject should create a notification.
@ -560,7 +568,8 @@ class TaskViewTests(APITestCase):
@mock.patch(
'stacktask.actions.models.user_store.IdentityManager', FakeManager)
@mock.patch(
'stacktask.actions.tenant_setup.models.IdentityManager', FakeManager)
'stacktask.actions.tenant_setup.models.user_store.IdentityManager',
FakeManager)
def test_duplicate_tasks_new_project(self):
"""
Ensure we can't submit duplicate tasks

View File

@ -32,7 +32,7 @@ class APIViewWithLogger(APIView):
"""
def __init__(self, *args, **kwargs):
super(APIViewWithLogger, self).__init__(*args, **kwargs)
self.logger = getLogger('django.request')
self.logger = getLogger('stacktask')
class StatusView(APIViewWithLogger):

View File

@ -73,7 +73,7 @@ class RequestLoggingMiddleware(object):
"""
def __init__(self):
self.logger = getLogger('django.request')
self.logger = getLogger('stacktask')
def process_request(self, request):
self.logger.info(

View File

@ -37,15 +37,20 @@ LOGGING = {
},
},
'loggers': {
'stacktask': {
'handlers': ['file'],
'level': 'INFO',
'propagate': False,
},
'django': {
'handlers': ['file'],
'level': 'INFO',
'propagate': True,
'propagate': False,
},
'keystonemiddleware': {
'handlers': ['file'],
'level': 'INFO',
'propagate': True,
'propagate': False,
},
},
}

View File

@ -6,5 +6,5 @@ skipsdist = True
usedevelop = True
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = stacktask-api test stacktask
commands = stacktask-api test stacktask {posargs}
setenv = VIRTUAL_ENV={envdir}