diff --git a/test-requirements.txt b/test-requirements.txt index 5de00caad9..771b9479a6 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -22,3 +22,4 @@ testrepository>=0.0.18 # Apache-2.0/BSD pymongo>=3.0.2 # Apache-2.0 redis>=2.10.0 # MIT psycopg2>=2.5 # LGPL/ZPL +cassandra-driver>=2.1.4 # Apache-2.0 diff --git a/trove/common/cfg.py b/trove/common/cfg.py index b986d43921..450f15faf0 100644 --- a/trove/common/cfg.py +++ b/trove/common/cfg.py @@ -814,6 +814,11 @@ cassandra_opts = [ cfg.StrOpt('root_controller', default='trove.extensions.common.service.DefaultRootController', help='Root controller implementation for cassandra.'), + cfg.ListOpt('ignore_users', default=['os_admin'], + help='Users to exclude when listing users.'), + cfg.ListOpt('ignore_dbs', default=['system', 'system_auth', + 'system_traces'], + help='Databases to exclude when listing databases.'), cfg.StrOpt('guest_log_exposed_logs', default='', help='List of Guest Logs to expose for publishing.'), ] diff --git a/trove/guestagent/datastore/experimental/cassandra/manager.py b/trove/guestagent/datastore/experimental/cassandra/manager.py index f2d8036de1..99a48f6ab3 100644 --- a/trove/guestagent/datastore/experimental/cassandra/manager.py +++ b/trove/guestagent/datastore/experimental/cassandra/manager.py @@ -15,10 +15,14 @@ # import os +import yaml from oslo_log import log as logging from trove.guestagent.datastore.experimental.cassandra import service +from trove.guestagent.datastore.experimental.cassandra.service import ( + CassandraAdmin +) from trove.guestagent.datastore import manager from trove.guestagent import volume @@ -29,13 +33,21 @@ LOG = logging.getLogger(__name__) class Manager(manager.Manager): def __init__(self): - self.appStatus = service.CassandraAppStatus() - self.app = service.CassandraApp(self.appStatus) + self._app = service.CassandraApp() + self.__admin = CassandraAdmin(self.app.get_current_superuser()) super(Manager, self).__init__('cassandra') @property def status(self): - return self.appStatus + return self.app.status + + @property + def app(self): + return self._app + + @property + def admin(self): + return self.__admin def restart(self, context): self.app.restart() @@ -62,25 +74,73 @@ class Manager(manager.Manager): # FIXME(amrith) Once the cassandra bug # https://issues.apache.org/jira/browse/CASSANDRA-2356 # is fixed, this code may have to be revisited. - LOG.debug("Stopping database prior to changes.") + LOG.debug("Stopping database prior to initial configuration.") self.app.stop_db() if config_contents: - LOG.debug("Processing configuration.") - self.app.write_config(config_contents) + LOG.debug("Applying configuration.") + self.app.write_config(yaml.load(config_contents)) self.app.make_host_reachable() if device_path: + LOG.debug("Preparing data volume.") device = volume.VolumeDevice(device_path) # unmount if device is already mounted device.unmount_device(device_path) device.format() if os.path.exists(mount_point): # rsync exiting data + LOG.debug("Migrating existing data.") device.migrate_data(mount_point) # mount the volume - device.mount(mount_point) LOG.debug("Mounting new volume.") + device.mount(mount_point) - LOG.debug("Restarting database after changes.") - self.app.start_db() + LOG.debug("Starting database with configuration changes.") + self.app.start_db(update_db=False) + + if not self.app.has_user_config(): + LOG.debug("Securing superuser access.") + self.app.secure() + self.app.restart() + + self.__admin = CassandraAdmin(self.app.get_current_superuser()) + + def change_passwords(self, context, users): + self.admin.change_passwords(context, users) + + def update_attributes(self, context, username, hostname, user_attrs): + self.admin.update_attributes(context, username, hostname, user_attrs) + + def create_database(self, context, databases): + self.admin.create_database(context, databases) + + def create_user(self, context, users): + self.admin.create_user(context, users) + + def delete_database(self, context, database): + self.admin.delete_database(context, database) + + def delete_user(self, context, user): + self.admin.delete_user(context, user) + + def get_user(self, context, username, hostname): + return self.admin.get_user(context, username, hostname) + + def grant_access(self, context, username, hostname, databases): + self.admin.grant_access(context, username, hostname, databases) + + def revoke_access(self, context, username, hostname, database): + self.admin.revoke_access(context, username, hostname, database) + + def list_access(self, context, username, hostname): + return self.admin.list_access(context, username, hostname) + + def list_databases(self, context, limit=None, marker=None, + include_marker=False): + return self.admin.list_databases(context, limit, marker, + include_marker) + + def list_users(self, context, limit=None, marker=None, + include_marker=False): + return self.admin.list_users(context, limit, marker, include_marker) diff --git a/trove/guestagent/datastore/experimental/cassandra/service.py b/trove/guestagent/datastore/experimental/cassandra/service.py index 2c5379cb23..5369219180 100644 --- a/trove/guestagent/datastore/experimental/cassandra/service.py +++ b/trove/guestagent/datastore/experimental/cassandra/service.py @@ -14,26 +14,35 @@ # under the License. import os -import tempfile +import re +import stat +from cassandra.auth import PlainTextAuthProvider +from cassandra.cluster import Cluster +from cassandra.cluster import NoHostAvailable +from cassandra import OperationTimedOut from oslo_log import log as logging from oslo_utils import netutils -import yaml from trove.common import cfg from trove.common import exception from trove.common.i18n import _ from trove.common import instance as rd_instance +from trove.common import pagination +from trove.common.stream_codecs import IniCodec +from trove.common.stream_codecs import SafeYamlCodec from trove.common import utils +from trove.guestagent.common import guestagent_utils from trove.guestagent.common import operating_system from trove.guestagent.common.operating_system import FileMode -from trove.guestagent.datastore.experimental.cassandra import system from trove.guestagent.datastore import service +from trove.guestagent.db import models from trove.guestagent import pkg LOG = logging.getLogger(__name__) CONF = cfg.CONF +MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'cassandra' packager = pkg.Package() @@ -41,10 +50,65 @@ packager = pkg.Package() class CassandraApp(object): """Prepares DBaaS on a Guest container.""" - def __init__(self, status): + _ADMIN_USER = 'os_admin' + + _CONF_AUTH_SEC = 'authentication' + _CONF_USR_KEY = 'username' + _CONF_PWD_KEY = 'password' + _CONF_DIR_MODS = stat.S_IRWXU + _CONF_FILE_MODS = stat.S_IRUSR + + CASSANDRA_KILL_CMD = "sudo killall java || true" + + def __init__(self): """By default login with root no password for initial setup.""" self.state_change_wait_time = CONF.state_change_wait_time - self.status = status + self.status = CassandraAppStatus(self.get_current_superuser()) + + @property + def service_candidates(self): + return ['cassandra'] + + @property + def cassandra_conf(self): + return { + operating_system.REDHAT: + "/etc/cassandra/default.conf/cassandra.yaml", + operating_system.DEBIAN: + "/etc/cassandra/cassandra.yaml", + operating_system.SUSE: + "/etc/cassandra/default.conf/cassandra.yaml" + }[operating_system.get_os()] + + @property + def cassandra_owner(self): + return 'cassandra' + + @property + def cassandra_data_dir(self): + return guestagent_utils.build_file_path( + self.cassandra_working_dir, 'data') + + @property + def cassandra_working_dir(self): + return "/var/lib/cassandra" + + @property + def default_superuser_name(self): + return "cassandra" + + @property + def default_superuser_password(self): + return "cassandra" + + @property + def default_superuser_pwd_hash(self): + # Default 'salted_hash' value for 'cassandra' user on Cassandra 2.1. + return "$2a$10$wPEVuXBU7WE2Uwzqq3t19ObRJyoKztzC/Doyfr0VtDmVXC4GDAV3e" + + @property + def cqlsh_conf_path(self): + return "~/.cassandra/cqlshrc" def install_if_needed(self, packages): """Prepare the guest machine with a cassandra server installation.""" @@ -61,86 +125,125 @@ class CassandraApp(object): def start_db(self, update_db=False): self.status.start_db_service( - system.SERVICE_CANDIDATES, self.state_change_wait_time, + self.service_candidates, self.state_change_wait_time, enable_on_boot=True, update_db=update_db) def stop_db(self, update_db=False, do_not_start_on_reboot=False): self.status.stop_db_service( - system.SERVICE_CANDIDATES, self.state_change_wait_time, + self.service_candidates, self.state_change_wait_time, disable_on_boot=do_not_start_on_reboot, update_db=update_db) def restart(self): self.status.restart_db_service( - system.SERVICE_CANDIDATES, self.state_change_wait_time) + self.service_candidates, self.state_change_wait_time) def _install_db(self, packages): """Install cassandra server""" LOG.debug("Installing cassandra server.") - packager.pkg_install(packages, None, system.INSTALL_TIMEOUT) + packager.pkg_install(packages, None, 10000) LOG.debug("Finished installing Cassandra server") - def write_config(self, config_contents, - execute_function=utils.execute_with_timeout, - mkstemp_function=tempfile.mkstemp, - unlink_function=os.unlink): + def secure(self, update_user=None): + """Configure the Trove administrative user. + Update an existing user if given. + Create a new one using the default database credentials + otherwise and drop the built-in user when finished. + """ + LOG.info(_('Configuring Trove superuser.')) - # first securely create a temp file. mkstemp() will set - # os.O_EXCL on the open() call, and we get a file with - # permissions of 600 by default. - (conf_fd, conf_path) = mkstemp_function() + current_superuser = update_user or models.CassandraUser( + self.default_superuser_name, + self.default_superuser_password) - LOG.debug('Storing temporary configuration at %s.' % conf_path) + if update_user: + os_admin = models.CassandraUser(update_user.name, + utils.generate_random_password()) + CassandraAdmin(current_superuser).alter_user_password(os_admin) + else: + os_admin = models.CassandraUser(self._ADMIN_USER, + utils.generate_random_password()) + CassandraAdmin(current_superuser)._create_superuser(os_admin) + CassandraAdmin(os_admin).drop_user(current_superuser) - # write config and close the file, delete it if there is an - # error. only unlink if there is a problem. In normal course, - # we move the file. - try: - os.write(conf_fd, config_contents) - operating_system.move(conf_path, system.CASSANDRA_CONF, - as_root=True) - # TODO(denis_makogon): figure out the dynamic way to discover - # configs owner since it can cause errors if there is - # no cassandra user in operating system - operating_system.chown(system.CASSANDRA_CONF, - 'cassandra', 'cassandra', recursive=False, - as_root=True) - operating_system.chmod(system.CASSANDRA_CONF, - FileMode.ADD_READ_ALL, as_root=True) - except Exception: - LOG.exception( - _("Exception generating Cassandra configuration %s.") % - conf_path) - unlink_function(conf_path) - raise - finally: - os.close(conf_fd) + self.__create_cqlsh_config({self._CONF_AUTH_SEC: + {self._CONF_USR_KEY: os_admin.name, + self._CONF_PWD_KEY: os_admin.password}}) + + # Update the internal status with the new user. + self.status = CassandraAppStatus(os_admin) + + return os_admin + + def __create_cqlsh_config(self, sections): + config_path = self._get_cqlsh_conf_path() + config_dir = os.path.dirname(config_path) + if not os.path.exists(config_dir): + os.mkdir(config_dir, self._CONF_DIR_MODS) + else: + os.chmod(config_dir, self._CONF_DIR_MODS) + operating_system.write_file(config_path, sections, codec=IniCodec()) + os.chmod(config_path, self._CONF_FILE_MODS) + + def get_current_superuser(self): + """ + Build the Trove superuser. + Use the stored credentials. + If not available fall back to the defaults. + """ + if self.has_user_config(): + return self._load_current_superuser() + + LOG.warn(_("Trove administrative user has not been configured yet. " + "Using the built-in default: %s") + % self.default_superuser_name) + return models.CassandraUser(self.default_superuser_name, + self.default_superuser_password) + + def has_user_config(self): + """ + Return TRUE if there is a client configuration file available + on the guest. + """ + return os.path.exists(self._get_cqlsh_conf_path()) + + def _load_current_superuser(self): + config = operating_system.read_file(self._get_cqlsh_conf_path(), + codec=IniCodec()) + return models.CassandraUser( + config[self._CONF_AUTH_SEC][self._CONF_USR_KEY], + config[self._CONF_AUTH_SEC][self._CONF_PWD_KEY] + ) + + def write_config(self, config_contents): + + operating_system.write_file( + self.cassandra_conf, config_contents, codec=SafeYamlCodec(), + as_root=True) + operating_system.chown(self.cassandra_conf, + self.cassandra_owner, + self.cassandra_owner, + recursive=False, as_root=True) + operating_system.chmod(self.cassandra_conf, + FileMode.ADD_READ_ALL, as_root=True) LOG.info(_('Wrote new Cassandra configuration.')) - def read_conf(self): - """Returns cassandra.yaml in dict structure.""" - - LOG.debug("Opening cassandra.yaml.") - with open(system.CASSANDRA_CONF, 'r') as config: - LOG.debug("Preparing YAML object from cassandra.yaml.") - yamled = yaml.load(config.read()) - return yamled - def update_config_with_single(self, key, value): """Updates single key:value in 'cassandra.yaml'.""" - yamled = self.read_conf() + yamled = operating_system.read_file(self.cassandra_conf, + codec=SafeYamlCodec()) yamled.update({key: value}) LOG.debug("Updating cassandra.yaml with %(key)s: %(value)s." % {'key': key, 'value': value}) - dump = yaml.dump(yamled, default_flow_style=False) LOG.debug("Dumping YAML to stream.") - self.write_config(dump) + self.write_config(yamled) def update_conf_with_group(self, group): """Updates group of key:value in 'cassandra.yaml'.""" - yamled = self.read_conf() + yamled = operating_system.read_file(self.cassandra_conf, + codec=SafeYamlCodec()) for key, value in group.iteritems(): if key == 'seed': (yamled.get('seed_provider')[0]. @@ -150,9 +253,8 @@ class CassandraApp(object): yamled.update({key: value}) LOG.debug("Updating cassandra.yaml with %(key)s: %(value)s." % {'key': key, 'value': value}) - dump = yaml.dump(yamled, default_flow_style=False) LOG.debug("Dumping YAML to stream") - self.write_config(dump) + self.write_config(yamled) def make_host_reachable(self): updates = { @@ -180,22 +282,540 @@ class CassandraApp(object): LOG.debug("Resetting configuration") self.write_config(config_contents) + def _get_cqlsh_conf_path(self): + return os.path.expanduser(self.cqlsh_conf_path) + class CassandraAppStatus(service.BaseDbStatus): + def __init__(self, superuser): + """ + :param superuser: User account the Status uses for connecting + to the database. + :type superuser: CassandraUser + """ + super(CassandraAppStatus, self).__init__() + self.__user = superuser + + def set_superuser(self, user): + self.__user = user + def _get_actual_db_status(self): try: - # If status check would be successful, - # bot stdin and stdout would contain nothing - out, err = utils.execute_with_timeout(system.CASSANDRA_STATUS, - shell=True) - if "Connection error. Could not connect to" not in err: + with CassandraLocalhostConnection(self.__user): return rd_instance.ServiceStatuses.RUNNING - else: - return rd_instance.ServiceStatuses.SHUTDOWN - except (exception.ProcessExecutionError, OSError): - LOG.exception(_("Error getting Cassandra status")) + except NoHostAvailable: return rd_instance.ServiceStatuses.SHUTDOWN + except Exception: + LOG.exception(_("Error getting Cassandra status.")) + + return rd_instance.ServiceStatuses.SHUTDOWN def cleanup_stalled_db_services(self): - utils.execute_with_timeout(system.CASSANDRA_KILL, shell=True) + utils.execute_with_timeout(CassandraApp.CASSANDRA_KILL_CMD, shell=True) + + +class CassandraAdmin(object): + """Handles administrative tasks on the Cassandra database. + + In Cassandra only SUPERUSERS can create other users and grant permissions + to database resources. Trove uses the 'cassandra' superuser to perform its + administrative tasks. + + The users it creates are all 'normal' (NOSUPERUSER) accounts. + The permissions it can grant are also limited to non-superuser operations. + This is to prevent anybody from creating a new superuser via the Trove API. + """ + + # Non-superuser grant modifiers. + __NO_SUPERUSER_MODIFIERS = ('ALTER', 'CREATE', 'DROP', 'MODIFY', 'SELECT') + + _KS_NAME_REGEX = re.compile('^$') + + def __init__(self, user): + self.__admin_user = user + + def create_user(self, context, users): + """ + Create new non-superuser accounts. + New users are by default granted full access to all database resources. + """ + with CassandraLocalhostConnection(self.__admin_user) as client: + for item in users: + self._create_user_and_grant(client, + self._deserialize_user(item)) + + def _create_user_and_grant(self, client, user): + """ + Create new non-superuser account and grant it full access to its + databases. + """ + self._create_user(client, user) + for db in user.databases: + self._grant_full_access_on_keyspace( + client, self._deserialize_keyspace(db), user) + + def _create_user(self, client, user): + # Create only NOSUPERUSER accounts here. + LOG.debug("Creating a new user '%s'." % user.name) + client.execute("CREATE USER '{}' WITH PASSWORD %s NOSUPERUSER;", + (user.name,), (user.password,)) + + def _create_superuser(self, user): + """Create a new superuser account and grant it full superuser-level + access to all keyspaces. + """ + LOG.debug("Creating a new superuser '%s'." % user.name) + with CassandraLocalhostConnection(self.__admin_user) as client: + client.execute("CREATE USER '{}' WITH PASSWORD %s SUPERUSER;", + (user.name,), (user.password,)) + client.execute("GRANT ALL PERMISSIONS ON ALL KEYSPACES TO '{}';", + (user.name,)) + + def delete_user(self, context, user): + self.drop_user(self._deserialize_user(user)) + + def drop_user(self, user): + with CassandraLocalhostConnection(self.__admin_user) as client: + self._drop_user(client, user) + + def _drop_user(self, client, user): + LOG.debug("Deleting user '%s'." % user.name) + client.execute("DROP USER '{}';", (user.name, )) + + def get_user(self, context, username, hostname): + with CassandraLocalhostConnection(self.__admin_user) as client: + user = self._find_user(client, username) + return user.serialize() if user is not None else None + + def _find_user(self, client, username): + """ + Lookup a user with a given username. + Omit user names on the ignore list. + Return a new Cassandra user instance or None if no match is found. + """ + return next((user for user in self._get_listed_users(client) + if user.name == username), None) + + def list_users(self, context, limit=None, marker=None, + include_marker=False): + """ + List all non-superuser accounts. Omit names on the ignored list. + Return an empty set if None. + """ + with CassandraLocalhostConnection(self.__admin_user) as client: + users = [user.serialize() for user in + self._get_listed_users(client)] + return pagination.paginate_list(users, limit, marker, + include_marker) + + def _get_listed_users(self, client): + """ + Return a set of unique user instances. + Omit user names on the ignore list. + """ + return self._get_users( + client, lambda user: user.name not in self.ignore_users) + + def _get_users(self, client, matcher=None): + """ + :param matcher Filter expression. + :type matcher callable + """ + acl = self._get_acl(client) + return {self._build_user(user.name, acl) + for user in client.execute("LIST USERS;") + if not matcher or matcher(user)} + + def _load_user(self, client, username, check_reserved=True): + if check_reserved: + self._check_reserved_user_name(username) + + acl = self._get_acl(client, username=username) + return self._build_user(username, acl) + + def _build_user(self, username, acl): + user = models.CassandraUser(username) + for ks, permissions in acl.get(username, {}).items(): + if permissions: + user.databases.append(models.CassandraSchema(ks).serialize()) + return user + + def _get_acl(self, client, username=None): + """Return the ACL for a database user. + Return ACLs for all users if no particular username is specified. + + The ACL has the following format: + {username #1: + {keyspace #1: {access mod(s)...}, + keyspace #2: {...}}, + username #2: + {keyspace #1: {...}, + keyspace #3: {...}} + } + """ + + def build_list_query(username): + query_tokens = ["LIST ALL PERMISSIONS"] + if username: + query_tokens.extend(["OF", "'%s'" % username]) + query_tokens.append("NORECURSIVE;") + return ' '.join(query_tokens) + + def parse_keyspace_name(resource): + """Parse a keyspace name from a resource string. + The resource string has the following form: + + where 'object' is one of the database objects (keyspace, table...). + Return the name as a singleton set. Return an empty set if no match + is found. + """ + match = self._KS_NAME_REGEX.match(resource) + if match: + return {match.group(1)} + return {} + + def update_acl(username, keyspace, permission, acl): + permissions = acl.get(username, {}).get(keyspace) + if permissions is None: + guestagent_utils.update_dict({user: {keyspace: {permission}}}, + acl) + else: + permissions.add(permission) + + all_keyspace_names = None + acl = dict() + for item in client.execute(build_list_query(username)): + user = item.username + resource = item.resource + permission = item.permission + if user and resource and permission: + if resource == '': + # Cache the full keyspace list to improve performance and + # ensure consistent results for all users. + if all_keyspace_names is None: + all_keyspace_names = { + item.name + for item in self._get_available_keyspaces(client) + } + keyspaces = all_keyspace_names + else: + keyspaces = parse_keyspace_name(resource) + + for keyspace in keyspaces: + update_acl(user, keyspace, permission, acl) + + return acl + + def list_superusers(self): + """List all system users existing in the database.""" + with CassandraLocalhostConnection(self.__admin_user) as client: + return self._get_users(client, lambda user: user.super) + + def grant_access(self, context, username, hostname, databases): + """ + Grant full access on keyspaces to a given username. + """ + user = models.CassandraUser(username) + with CassandraLocalhostConnection(self.__admin_user) as client: + for db in databases: + self._grant_full_access_on_keyspace( + client, models.CassandraSchema(db), user) + + def revoke_access(self, context, username, hostname, database): + """ + Revoke all permissions on any database resources from a given username. + """ + user = models.CassandraUser(username) + with CassandraLocalhostConnection(self.__admin_user) as client: + self._revoke_all_access_on_keyspace( + client, models.CassandraSchema(database), user) + + def _grant_full_access_on_keyspace(self, client, keyspace, user, + check_reserved=True): + """ + Grant all non-superuser permissions on a keyspace to a given user. + """ + if check_reserved: + self._check_reserved_user_name(user.name) + self._check_reserved_keyspace_name(keyspace.name) + + for access in self.__NO_SUPERUSER_MODIFIERS: + self._grant_permission_on_keyspace(client, access, keyspace, user) + + def _grant_permission_on_keyspace(self, client, modifier, keyspace, user): + """ + Grant a non-superuser permission on a keyspace to a given user. + Raise an exception if the caller attempts to grant a superuser access. + """ + LOG.debug("Granting '%s' access on '%s' to user '%s'." + % (modifier, keyspace.name, user.name)) + if modifier in self.__NO_SUPERUSER_MODIFIERS: + client.execute("GRANT {} ON KEYSPACE \"{}\" TO '{}';", + (modifier, keyspace.name, user.name)) + else: + raise exception.UnprocessableEntity( + "Invalid permission modifier (%s). Allowed values are: '%s'" + % (modifier, ', '.join(self.__NO_SUPERUSER_MODIFIERS))) + + def _revoke_all_access_on_keyspace(self, client, keyspace, user, + check_reserved=True): + if check_reserved: + self._check_reserved_user_name(user.name) + self._check_reserved_keyspace_name(keyspace.name) + + LOG.debug("Revoking all permissions on '%s' from user '%s'." + % (keyspace.name, user.name)) + client.execute("REVOKE ALL PERMISSIONS ON KEYSPACE \"{}\" FROM '{}';", + (keyspace.name, user.name)) + + def update_attributes(self, context, username, hostname, user_attrs): + with CassandraLocalhostConnection(self.__admin_user) as client: + user = self._load_user(client, username) + new_name = user_attrs.get('name') + new_password = user_attrs.get('password') + self._update_user(client, user, new_name, new_password) + + def _update_user(self, client, user, new_username, new_password): + """ + Update a user of a given username. + Updatable attributes include username and password. + If a new username and password are given a new user with those + attributes is created and all permissions from the original + user get transfered to it. The original user is then dropped + therefore revoking its permissions. + If only new password is specified the existing user gets altered + with that password. + """ + if new_username is not None and user.name != new_username: + if new_password is not None: + self._rename_user(client, user, new_username, new_password) + else: + raise exception.UnprocessableEntity( + _("Updating username requires specifying a password " + "as well.")) + elif new_password is not None and user.password != new_password: + user.password = new_password + self._alter_user_password(client, user) + + def _rename_user(self, client, user, new_username, new_password): + """ + Rename a given user also updating its password. + Transfer the current permissions to the new username. + Drop the old username therefore revoking its permissions. + """ + LOG.debug("Renaming user '%s' to '%s'" % (user.name, new_username)) + new_user = models.CassandraUser(new_username, new_password) + new_user.databases.extend(user.databases) + self._create_user_and_grant(client, new_user) + self._drop_user(client, user) + + def alter_user_password(self, user): + with CassandraLocalhostConnection(self.__admin_user) as client: + self._alter_user_password(client, user) + + def change_passwords(self, context, users): + with CassandraLocalhostConnection(self.__admin_user) as client: + for user in users: + self._alter_user_password(client, self._deserialize_user(user)) + + def _alter_user_password(self, client, user): + LOG.debug("Changing password of user '%s'." % user.name) + client.execute("ALTER USER '{}' " + "WITH PASSWORD %s;", (user.name,), (user.password,)) + + def create_database(self, context, databases): + with CassandraLocalhostConnection(self.__admin_user) as client: + for item in databases: + self._create_single_node_keyspace( + client, self._deserialize_keyspace(item)) + + def _create_single_node_keyspace(self, client, keyspace): + """ + Create a single-replica keyspace. + + Cassandra stores replicas on multiple nodes to ensure reliability and + fault tolerance. All replicas are equally important; + there is no primary or master. + A replication strategy determines the nodes where + replicas are placed. SimpleStrategy is for a single data center only. + The total number of replicas across the cluster is referred to as the + replication factor. + + Replication Strategy: + 'SimpleStrategy' is not optimized for multiple data centers. + 'replication_factor' The number of replicas of data on multiple nodes. + Required for SimpleStrategy; otherwise, not used. + + Keyspace names are case-insensitive by default. + To make a name case-sensitive, enclose it in double quotation marks. + """ + client.execute("CREATE KEYSPACE \"{}\" WITH REPLICATION = " + "{{ 'class' : 'SimpleStrategy', " + "'replication_factor' : 1 }};", (keyspace.name,)) + + def delete_database(self, context, database): + with CassandraLocalhostConnection(self.__admin_user) as client: + self._drop_keyspace(client, self._deserialize_keyspace(database)) + + def _drop_keyspace(self, client, keyspace): + LOG.debug("Dropping keyspace '%s'." % keyspace.name) + client.execute("DROP KEYSPACE \"{}\";", (keyspace.name,)) + + def list_databases(self, context, limit=None, marker=None, + include_marker=False): + with CassandraLocalhostConnection(self.__admin_user) as client: + databases = [keyspace.serialize() for keyspace + in self._get_available_keyspaces(client)] + return pagination.paginate_list(databases, limit, marker, + include_marker) + + def _get_available_keyspaces(self, client): + """ + Return a set of unique keyspace instances. + Omit keyspace names on the ignore list. + """ + return {models.CassandraSchema(db.keyspace_name) + for db in client.execute("SELECT * FROM " + "system.schema_keyspaces;") + if db.keyspace_name not in self.ignore_dbs} + + def list_access(self, context, username, hostname): + with CassandraLocalhostConnection(self.__admin_user) as client: + user = self._find_user(client, username) + if user: + return user.databases + + raise exception.UserNotFound(username) + + def _deserialize_keyspace(self, keyspace_dict, check_reserved=True): + if keyspace_dict: + db = models.CassandraSchema.deserialize_schema(keyspace_dict) + if check_reserved: + self._check_reserved_keyspace_name(db.name) + + return db + + return None + + def _check_reserved_keyspace_name(self, name): + if name in self.ignore_dbs: + raise ValueError(_("This keyspace-name is reserved: %s") % name) + + def _deserialize_user(self, user_dict, check_reserved=True): + if user_dict: + user = models.CassandraUser.deserialize_user(user_dict) + if check_reserved: + self._check_reserved_user_name(user.name) + + return user + + return None + + def _check_reserved_user_name(self, name): + if name in self.ignore_users: + raise ValueError(_("This user-name is reserved: %s") % name) + + @property + def ignore_users(self): + return cfg.get_ignored_users(manager=MANAGER) + + @property + def ignore_dbs(self): + return cfg.get_ignored_dbs(manager=MANAGER) + + +class CassandraConnection(object): + """A wrapper to manage a Cassandra connection.""" + + # Cassandra 2.1 only supports protocol versions 3 and lower. + NATIVE_PROTOCOL_VERSION = 3 + + def __init__(self, contact_points, user): + self.__user = user + # A Cluster is initialized with a set of initial contact points. + # After the driver connects to one of the nodes it will automatically + # discover the rest. + # Will connect to '127.0.0.1' if None contact points are given. + self._cluster = Cluster( + contact_points=contact_points, + auth_provider=PlainTextAuthProvider(user.name, user.password), + protocol_version=self.NATIVE_PROTOCOL_VERSION) + self.__session = None + + def __enter__(self): + self.__connect() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.__disconnect() + + def execute(self, query, identifiers=None, data_values=None, timeout=None): + """ + Execute a query with a given sequence or dict of data values to bind. + If a sequence is used, '%s' should be used the placeholder for each + argument. If a dict is used, '%(name)s' style placeholders must + be used. + Only data values should be supplied this way. Other items, + such as keyspaces, table names, and column names should be set + ahead of time. Use the '{}' style placeholders and + 'identifiers' parameter for those. + Raise an exception if the operation exceeds the given timeout (sec). + There is no timeout if set to None. + Return a set of rows or an empty list if None. + """ + if self.__is_active(): + try: + rows = self.__session.execute(self.__bind(query, identifiers), + data_values, timeout) + return rows or [] + except OperationTimedOut: + LOG.error(_("Query execution timed out.")) + raise + + LOG.debug("Cannot perform this operation on a closed connection.") + raise exception.UnprocessableEntity() + + def __bind(self, query, identifiers): + if identifiers: + return query.format(*identifiers) + return query + + def __connect(self): + if not self._cluster.is_shutdown: + LOG.debug("Connecting to a Cassandra cluster as '%s'." + % self.__user.name) + if not self.__is_active(): + self.__session = self._cluster.connect() + else: + LOG.debug("Connection already open.") + LOG.debug("Connected to cluster: '%s'" + % self._cluster.metadata.cluster_name) + for host in self._cluster.metadata.all_hosts(): + LOG.debug("Connected to node: '%s' in rack '%s' at datacenter " + "'%s'" % (host.address, host.rack, host.datacenter)) + else: + LOG.debug("Cannot perform this operation on a terminated cluster.") + raise exception.UnprocessableEntity() + + def __disconnect(self): + if self.__is_active(): + try: + LOG.debug("Disconnecting from cluster: '%s'" + % self._cluster.metadata.cluster_name) + self._cluster.shutdown() + self.__session.shutdown() + except Exception: + LOG.debug("Failed to disconnect from a Cassandra cluster.") + + def __is_active(self): + return self.__session and not self.__session.is_shutdown + + +class CassandraLocalhostConnection(CassandraConnection): + """ + A connection to the localhost Cassandra server. + """ + + def __init__(self, user): + super(CassandraLocalhostConnection, self).__init__(None, user) diff --git a/trove/guestagent/datastore/experimental/cassandra/system.py b/trove/guestagent/datastore/experimental/cassandra/system.py deleted file mode 100644 index 481c397ac9..0000000000 --- a/trove/guestagent/datastore/experimental/cassandra/system.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright 2013 Mirantis Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from trove.common import cfg - -CONF = cfg.CONF - -SERVICE_CANDIDATES = ["cassandra"] - -CASSANDRA_DATA_DIR = "/var/lib/cassandra/data" -CASSANDRA_CONF = "/etc/cassandra/cassandra.yaml" -CASSANDRA_TEMP_CONF = "/tmp/cassandra.yaml" -CASSANDRA_TEMP_DIR = "/tmp/cassandra" - -CASSANDRA_STATUS = """echo "use system;" > /tmp/check; cqlsh -f /tmp/check""" - -CASSANDRA_KILL = "sudo killall java || true" -SERVICE_STOP_TIMEOUT = 60 -INSTALL_TIMEOUT = 10000 diff --git a/trove/guestagent/db/models.py b/trove/guestagent/db/models.py index f8be17d61a..8d239e6193 100644 --- a/trove/guestagent/db/models.py +++ b/trove/guestagent/db/models.py @@ -158,6 +158,36 @@ class MongoDBSchema(DatastoreSchema): return ['_name'] +class CassandraSchema(DatastoreSchema): + """Represents a Cassandra schema and its associated properties. + + Keyspace names are 32 or fewer alpha-numeric characters and underscores, + the first of which is an alpha character. + """ + + def __init__(self, name=None, deserializing=False): + super(CassandraSchema, self).__init__() + + if not (bool(deserializing) != bool(name)): + raise ValueError(_("Bad args. name: %(name)s, " + "deserializing %(deser)s.") + % ({'name': bool(name), + 'deser': bool(deserializing)})) + if not deserializing: + self.name = name + + @property + def _max_schema_name_length(self): + return 32 + + def _is_valid_schema_name(self, value): + return True + + @classmethod + def _dict_requirements(cls): + return ['_name'] + + class MySQLDatabase(Base): """Represents a Database and its properties.""" @@ -746,6 +776,45 @@ class MongoDBUser(DatastoreUser): return ['_name'] +class CassandraUser(DatastoreUser): + """Represents a Cassandra user and its associated properties.""" + + def __init__(self, name=None, password=None, deserializing=False): + super(CassandraUser, self).__init__() + + if ((not (bool(deserializing) != bool(name))) or + (bool(deserializing) and bool(password))): + raise ValueError(_("Bad args. name: %(name)s, " + "password %(pass)s, " + "deserializing %(deser)s.") + % ({'name': bool(name), + 'pass': bool(password), + 'deser': bool(deserializing)})) + if not deserializing: + self.name = name + self.password = password + + def _build_database_schema(self, name): + return CassandraSchema(name) + + @property + def _max_username_length(self): + return 65535 + + def _is_valid_name(self, value): + return True + + def _is_valid_host_name(self, value): + return True + + def _is_valid_password(self, value): + return True + + @classmethod + def _dict_requirements(cls): + return ['_name'] + + class MySQLUser(Base): """Represents a MySQL User and its associated properties.""" diff --git a/trove/templates/cassandra/config.template b/trove/templates/cassandra/config.template index 221009fc85..e369ff8e01 100644 --- a/trove/templates/cassandra/config.template +++ b/trove/templates/cassandra/config.template @@ -5,8 +5,8 @@ max_hint_window_in_ms: 10800000 hinted_handoff_throttle_in_kb: 1024 max_hints_delivery_threads: 2 batchlog_replay_throttle_in_kb: 1024 -authenticator: AllowAllAuthenticator -authorizer: AllowAllAuthorizer +authenticator: org.apache.cassandra.auth.PasswordAuthenticator +authorizer: org.apache.cassandra.auth.CassandraAuthorizer permissions_validity_in_ms: 2000 partitioner: org.apache.cassandra.dht.Murmur3Partitioner data_file_directories: @@ -38,11 +38,11 @@ trickle_fsync: false trickle_fsync_interval_in_kb: 10240 storage_port: 7000 ssl_storage_port: 7001 -listen_address: localhost +listen_address: 127.0.0.1 start_native_transport: true native_transport_port: 9042 start_rpc: true -rpc_address: localhost +rpc_address: 127.0.0.1 rpc_port: 9160 rpc_keepalive: true rpc_server_type: sync diff --git a/trove/tests/int_tests.py b/trove/tests/int_tests.py index 2a2b2f8cf5..76329a4619 100644 --- a/trove/tests/int_tests.py +++ b/trove/tests/int_tests.py @@ -185,6 +185,7 @@ register(["user"], user_actions_groups) register(["db2_supported"], common_groups, database_actions_groups, user_actions_groups) register(["cassandra_supported"], common_groups, + user_actions_groups, database_actions_groups, backup_groups, configuration_groups) register(["couchbase_supported"], common_groups, backup_groups, root_actions_groups) diff --git a/trove/tests/unittests/guestagent/test_cassandra_manager.py b/trove/tests/unittests/guestagent/test_cassandra_manager.py index b6019b1ff0..137e2be5a7 100644 --- a/trove/tests/unittests/guestagent/test_cassandra_manager.py +++ b/trove/tests/unittests/guestagent/test_cassandra_manager.py @@ -13,17 +13,25 @@ # under the License. import os +import random +import string +from mock import ANY +from mock import call from mock import MagicMock +from mock import NonCallableMagicMock from mock import patch from oslo_utils import netutils +from testtools import ExpectedException from trove.common.context import TroveContext +from trove.common import exception from trove.common.instance import ServiceStatuses from trove.guestagent.datastore.experimental.cassandra import ( manager as cass_manager) from trove.guestagent.datastore.experimental.cassandra import ( service as cass_service) +from trove.guestagent.db import models from trove.guestagent import pkg as pkg from trove.guestagent import volume from trove.tests.unittests import trove_testtools @@ -31,6 +39,32 @@ from trove.tests.unittests import trove_testtools class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase): + __N_GAK = '_get_available_keyspaces' + __N_GLU = '_get_listed_users' + __N_BU = '_build_user' + __N_RU = '_rename_user' + __N_AUP = '_alter_user_password' + __N_CAU = 'trove.guestagent.db.models.CassandraUser' + __N_CU = '_create_user' + __N_GFA = '_grant_full_access_on_keyspace' + __N_DU = '_drop_user' + + __ACCESS_MODIFIERS = ('ALTER', 'CREATE', 'DROP', 'MODIFY', 'SELECT') + __CREATE_DB_FORMAT = ( + "CREATE KEYSPACE \"{}\" WITH REPLICATION = " + "{{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }};" + ) + __DROP_DB_FORMAT = "DROP KEYSPACE \"{}\";" + __CREATE_USR_FORMAT = "CREATE USER '{}' WITH PASSWORD %s NOSUPERUSER;" + __ALTER_USR_FORMAT = "ALTER USER '{}' WITH PASSWORD %s;" + __DROP_USR_FORMAT = "DROP USER '{}';" + __GRANT_FORMAT = "GRANT {} ON KEYSPACE \"{}\" TO '{}';" + __REVOKE_FORMAT = "REVOKE ALL PERMISSIONS ON KEYSPACE \"{}\" FROM '{}';" + __LIST_PERMISSIONS_FORMAT = "LIST ALL PERMISSIONS NORECURSIVE;" + __LIST_PERMISSIONS_OF_FORMAT = "LIST ALL PERMISSIONS OF '{}' NORECURSIVE;" + __LIST_DB_FORMAT = "SELECT * FROM system.schema_keyspaces;" + __LIST_USR_FORMAT = "LIST USERS;" + def setUp(self): super(GuestAgentCassandraDBManagerTest, self).setUp() self.real_status = cass_service.CassandraAppStatus.set_status @@ -45,6 +79,9 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase): return_value=FakeInstanceServiceStatus()) self.context = TroveContext() self.manager = cass_manager.Manager() + self.manager._Manager__admin = cass_service.CassandraAdmin( + models.CassandraUser('Test')) + self.admin = self.manager._Manager__admin self.pkg = cass_service.packager self.real_db_app_status = cass_service.CassandraAppStatus self.origin_os_path_exists = os.path.exists @@ -74,10 +111,11 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase): netutils.get_my_ipv4 = self.original_get_ip cass_service.CassandraApp.make_host_reachable = ( self.orig_make_host_reachable) + cass_service.CassandraAppStatus.set_status = self.real_status def test_update_status(self): mock_status = MagicMock() - self.manager.appStatus = mock_status + self.manager.app.status = mock_status self.manager.update_status(self.context) mock_status.update.assert_any_call() @@ -109,8 +147,8 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase): mock_status = MagicMock() mock_app = MagicMock() - self.manager.appStatus = mock_status - self.manager.app = mock_app + mock_app.status = mock_status + self.manager._app = mock_app mock_status.begin_install = MagicMock(return_value=None) mock_app.install_if_needed = MagicMock(return_value=None) @@ -144,5 +182,419 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase): mock_app.install_if_needed.assert_any_call(packages) mock_app.init_storage_structure.assert_any_call('/var/lib/cassandra') mock_app.make_host_reachable.assert_any_call() - mock_app.start_db.assert_any_call() + mock_app.start_db.assert_any_call(update_db=False) mock_app.stop_db.assert_any_call() + + def test_keyspace_validation(self): + valid_name = self._get_random_name(32) + db = models.CassandraSchema(valid_name) + self.assertEqual(valid_name, db.name) + with ExpectedException(ValueError): + models.CassandraSchema(self._get_random_name(33)) + + def test_user_validation(self): + valid_name = self._get_random_name(65535) + usr = models.CassandraUser(valid_name, 'password') + self.assertEqual(valid_name, usr.name) + self.assertEqual('password', usr.password) + with ExpectedException(ValueError): + models.CassandraUser(self._get_random_name(65536)) + + @classmethod + def _serialize_collection(self, *collection): + return [item.serialize() for item in collection] + + @classmethod + def _get_random_name(self, size, chars=string.letters + string.digits): + return ''.join(random.choice(chars) for _ in range(size)) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_create_database(self, conn): + db1 = models.CassandraSchema('db1') + db2 = models.CassandraSchema('db2') + db3 = models.CassandraSchema(self._get_random_name(32)) + + self.manager.create_database(self.context, + self._serialize_collection(db1, db2, db3)) + conn.return_value.execute.assert_has_calls([ + call(self.__CREATE_DB_FORMAT, (db1.name,)), + call(self.__CREATE_DB_FORMAT, (db2.name,)), + call(self.__CREATE_DB_FORMAT, (db3.name,)) + ]) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_delete_database(self, conn): + db = models.CassandraSchema(self._get_random_name(32)) + self.manager.delete_database(self.context, db.serialize()) + conn.return_value.execute.assert_called_once_with( + self.__DROP_DB_FORMAT, (db.name,)) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_create_user(self, conn): + usr1 = models.CassandraUser('usr1') + usr2 = models.CassandraUser('usr2', '') + usr3 = models.CassandraUser(self._get_random_name(1025), 'password') + + self.manager.create_user(self.context, + self._serialize_collection(usr1, usr2, usr3)) + conn.return_value.execute.assert_has_calls([ + call(self.__CREATE_USR_FORMAT, (usr1.name,), (usr1.password,)), + call(self.__CREATE_USR_FORMAT, (usr2.name,), (usr2.password,)), + call(self.__CREATE_USR_FORMAT, (usr3.name,), (usr3.password,)) + ]) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_delete_user(self, conn): + usr = models.CassandraUser(self._get_random_name(1025), 'password') + self.manager.delete_user(self.context, usr.serialize()) + conn.return_value.execute.assert_called_once_with( + self.__DROP_USR_FORMAT, (usr.name,)) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_change_passwords(self, conn): + usr1 = models.CassandraUser('usr1') + usr2 = models.CassandraUser('usr2', '') + usr3 = models.CassandraUser(self._get_random_name(1025), 'password') + + self.manager.change_passwords(self.context, self._serialize_collection( + usr1, usr2, usr3)) + conn.return_value.execute.assert_has_calls([ + call(self.__ALTER_USR_FORMAT, (usr1.name,), (usr1.password,)), + call(self.__ALTER_USR_FORMAT, (usr2.name,), (usr2.password,)), + call(self.__ALTER_USR_FORMAT, (usr3.name,), (usr3.password,)) + ]) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_alter_user_password(self, conn): + usr1 = models.CassandraUser('usr1') + usr2 = models.CassandraUser('usr2', '') + usr3 = models.CassandraUser(self._get_random_name(1025), 'password') + + self.admin.alter_user_password(usr1) + self.admin.alter_user_password(usr2) + self.admin.alter_user_password(usr3) + conn.return_value.execute.assert_has_calls([ + call(self.__ALTER_USR_FORMAT, (usr1.name,), (usr1.password,)), + call(self.__ALTER_USR_FORMAT, (usr2.name,), (usr2.password,)), + call(self.__ALTER_USR_FORMAT, (usr3.name,), (usr3.password,)) + ]) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_grant_access(self, conn): + usr1 = models.CassandraUser('usr1') + usr2 = models.CassandraUser('usr1', 'password') + db1 = models.CassandraSchema('db1') + db2 = models.CassandraSchema('db2') + db3 = models.CassandraSchema('db3') + + self.manager.grant_access(self.context, usr1.name, None, [db1.name, + db2.name]) + self.manager.grant_access(self.context, usr2.name, None, [db3.name]) + + expected = [] + for modifier in self.__ACCESS_MODIFIERS: + expected.append(call(self.__GRANT_FORMAT, + (modifier, db1.name, usr1.name))) + expected.append(call(self.__GRANT_FORMAT, + (modifier, db3.name, usr2.name))) + + conn.return_value.execute.assert_has_calls(expected, any_order=True) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_revoke_access(self, conn): + usr1 = models.CassandraUser('usr1') + usr2 = models.CassandraUser('usr1', 'password') + db1 = models.CassandraSchema('db1') + db2 = models.CassandraSchema('db2') + + self.manager.revoke_access(self.context, usr1.name, None, db1.name) + self.manager.revoke_access(self.context, usr2.name, None, db2.name) + conn.return_value.execute.assert_has_calls([ + call(self.__REVOKE_FORMAT, (db1.name, usr1.name)), + call(self.__REVOKE_FORMAT, (db2.name, usr2.name)) + ]) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_get_available_keyspaces(self, conn): + self.manager.list_databases(self.context) + conn.return_value.execute.assert_called_once_with( + self.__LIST_DB_FORMAT) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_list_databases(self, conn): + db1 = models.CassandraSchema('db1') + db2 = models.CassandraSchema('db2') + db3 = models.CassandraSchema(self._get_random_name(32)) + + with patch.object(self.admin, self.__N_GAK, return_value={db1, db2, + db3}): + found = self.manager.list_databases(self.context) + self.assertEqual(2, len(found)) + self.assertEqual(3, len(found[0])) + self.assertEqual(None, found[1]) + self.assertIn(db1.serialize(), found[0]) + self.assertIn(db2.serialize(), found[0]) + self.assertIn(db3.serialize(), found[0]) + + with patch.object(self.admin, self.__N_GAK, return_value=set()): + found = self.manager.list_databases(self.context) + self.assertEqual(([], None), found) + + def test_get_acl(self): + r0 = NonCallableMagicMock(username='user1', resource='', + permission='SELECT') + r1 = NonCallableMagicMock(username='user2', resource='', + permission='SELECT') + r2 = NonCallableMagicMock(username='user2', resource='', + permission='SELECT') + r3 = NonCallableMagicMock(username='user2', resource='', + permission='ALTER') + r4 = NonCallableMagicMock(username='user3', resource='', + permission='SELECT') + r5 = NonCallableMagicMock(username='user3', resource='', + permission='ALTER') + r6 = NonCallableMagicMock(username='user3', resource='', + permission='') + r7 = NonCallableMagicMock(username='user3', resource='', + permission='') + r8 = NonCallableMagicMock(username='user3', resource='', + permission='DELETE') + r9 = NonCallableMagicMock(username='user4', resource='', + permission='UPDATE') + r10 = NonCallableMagicMock(username='user4', resource='', + permission='DELETE') + + available_ks = {models.CassandraSchema('ks1'), + models.CassandraSchema('ks2'), + models.CassandraSchema('ks3')} + + mock_result_set = [r0, r1, r2, r3, r4, r5, r6, r7, r8, r9, r9, r9, r10] + execute_mock = MagicMock(return_value=mock_result_set) + mock_client = MagicMock(execute=execute_mock) + + with patch.object(self.admin, + self.__N_GAK, return_value=available_ks) as gak_mock: + acl = self.admin._get_acl(mock_client) + execute_mock.assert_called_once_with( + self.__LIST_PERMISSIONS_FORMAT) + gak_mock.assert_called_once_with(mock_client) + + self.assertEqual({'user1': {'ks1': {'SELECT'}, + 'ks2': {'SELECT'}, + 'ks3': {'SELECT'}}, + 'user2': {'ks1': {'SELECT'}, + 'ks2': {'SELECT', 'ALTER'}}, + 'user3': {'ks1': {'DELETE'}}, + 'user4': {'ks1': {'UPDATE', 'DELETE'}, + 'ks2': {'UPDATE'}, + 'ks3': {'UPDATE'}} + }, + acl) + + mock_result_set = [r1, r2, r3] + execute_mock = MagicMock(return_value=mock_result_set) + mock_client = MagicMock(execute=execute_mock) + + with patch.object(self.admin, + self.__N_GAK, return_value=available_ks) as gak_mock: + acl = self.admin._get_acl(mock_client, username='user2') + execute_mock.assert_called_once_with( + self.__LIST_PERMISSIONS_OF_FORMAT.format('user2')) + gak_mock.assert_not_called() + + self.assertEqual({'user2': {'ks1': {'SELECT'}, + 'ks2': {'SELECT', 'ALTER'}}}, acl) + + mock_result_set = [] + execute_mock = MagicMock(return_value=mock_result_set) + mock_client = MagicMock(execute=execute_mock) + + with patch.object(self.admin, + self.__N_GAK, return_value=available_ks) as gak_mock: + acl = self.admin._get_acl(mock_client, username='nonexisting') + execute_mock.assert_called_once_with( + self.__LIST_PERMISSIONS_OF_FORMAT.format('nonexisting')) + gak_mock.assert_not_called() + + self.assertEqual({}, acl) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_get_listed_users(self, conn): + usr1 = models.CassandraUser(self._get_random_name(1025)) + usr2 = models.CassandraUser(self._get_random_name(1025)) + usr3 = models.CassandraUser(self._get_random_name(1025)) + db1 = models.CassandraSchema('db1') + db2 = models.CassandraSchema('db2') + usr1.databases.append(db1.serialize()) + usr3.databases.append(db2.serialize()) + + rv_1 = NonCallableMagicMock() + rv_1.configure_mock(name=usr1.name, super=False) + rv_2 = NonCallableMagicMock() + rv_2.configure_mock(name=usr2.name, super=False) + rv_3 = NonCallableMagicMock() + rv_3.configure_mock(name=usr3.name, super=True) + + with patch.object(conn.return_value, 'execute', return_value=iter( + [rv_1, rv_2, rv_3])): + with patch.object(self.admin, '_get_acl', + return_value={usr1.name: {db1.name: {'SELECT'}, + db2.name: {}}, + usr3.name: {db2.name: {'SELECT'}}} + ): + usrs = self.manager.list_users(self.context) + conn.return_value.execute.assert_has_calls([ + call(self.__LIST_USR_FORMAT), + ], any_order=True) + self.assertIn(usr1.serialize(), usrs[0]) + self.assertIn(usr2.serialize(), usrs[0]) + self.assertIn(usr3.serialize(), usrs[0]) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_list_access(self, conn): + usr1 = models.CassandraUser('usr1') + usr2 = models.CassandraUser('usr2') + usr3 = models.CassandraUser(self._get_random_name(1025), 'password') + db1 = models.CassandraSchema('db1').serialize() + db2 = models.CassandraSchema('db2').serialize() + usr2.databases.append(db1) + usr3.databases.append(db1) + usr3.databases.append(db2) + + with patch.object(self.admin, self.__N_GLU, return_value={usr1, usr2, + usr3}): + usr1_dbs = self.manager.list_access(self.context, usr1.name, None) + usr2_dbs = self.manager.list_access(self.context, usr2.name, None) + usr3_dbs = self.manager.list_access(self.context, usr3.name, None) + self.assertEqual([], usr1_dbs) + self.assertEqual([db1], usr2_dbs) + self.assertEqual([db1, db2], usr3_dbs) + + with patch.object(self.admin, self.__N_GLU, return_value=set()): + with ExpectedException(exception.UserNotFound): + self.manager.list_access(self.context, usr3.name, None) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_list_users(self, conn): + usr1 = models.CassandraUser('usr1') + usr2 = models.CassandraUser('usr2') + usr3 = models.CassandraUser(self._get_random_name(1025), 'password') + + with patch.object(self.admin, self.__N_GLU, return_value={usr1, usr2, + usr3}): + found = self.manager.list_users(self.context) + self.assertEqual(2, len(found)) + self.assertEqual(3, len(found[0])) + self.assertEqual(None, found[1]) + self.assertIn(usr1.serialize(), found[0]) + self.assertIn(usr2.serialize(), found[0]) + self.assertIn(usr3.serialize(), found[0]) + + with patch.object(self.admin, self.__N_GLU, return_value=set()): + self.assertEqual(([], None), self.manager.list_users(self.context)) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_get_user(self, conn): + usr1 = models.CassandraUser('usr1') + usr2 = models.CassandraUser('usr2') + usr3 = models.CassandraUser(self._get_random_name(1025), 'password') + + with patch.object(self.admin, self.__N_GLU, return_value={usr1, usr2, + usr3}): + found = self.manager.get_user(self.context, usr2.name, None) + self.assertEqual(usr2.serialize(), found) + + with patch.object(self.admin, self.__N_GLU, return_value=set()): + self.assertIsNone( + self.manager.get_user(self.context, usr2.name, None)) + + @patch.object(cass_service.CassandraAdmin, '_deserialize_keyspace', + side_effect=lambda p1: p1) + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_rename_user(self, conn, ks_deserializer): + usr = models.CassandraUser('usr') + db1 = models.CassandraSchema('db1').serialize() + db2 = models.CassandraSchema('db2').serialize() + usr.databases.append(db1) + usr.databases.append(db2) + + new_user = models.CassandraUser('new_user') + with patch(self.__N_CAU, return_value=new_user): + with patch.object(self.admin, self.__N_BU, return_value=usr): + with patch.object(self.admin, self.__N_CU) as create: + with patch.object(self.admin, self.__N_GFA) as grant: + with patch.object(self.admin, self.__N_DU) as drop: + usr_attrs = {'name': 'user', 'password': 'trove'} + self.manager.update_attributes(self.context, + usr.name, None, + usr_attrs) + create.assert_called_once_with(ANY, new_user) + grant.assert_has_calls([call(ANY, db1, ANY), + call(ANY, db2, ANY)]) + drop.assert_called_once_with(ANY, usr) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_update_attributes(self, conn): + usr = models.CassandraUser('usr', 'pwd') + + with patch.object(self.admin, self.__N_BU, return_value=usr): + usr_attrs = {'name': usr.name, 'password': usr.password} + with patch.object(self.admin, self.__N_RU) as rename: + with patch.object(self.admin, self.__N_AUP) as alter: + self.manager.update_attributes(self.context, usr.name, + None, usr_attrs) + self.assertEqual(0, rename.call_count) + self.assertEqual(0, alter.call_count) + + usr_attrs = {'name': 'user', 'password': 'password'} + with patch.object(self.admin, self.__N_RU) as rename: + with patch.object(self.admin, self.__N_AUP) as alter: + self.manager.update_attributes(self.context, usr.name, + None, usr_attrs) + rename.assert_called_once_with(ANY, usr, usr_attrs['name'], + usr_attrs['password']) + self.assertEqual(0, alter.call_count) + + usr_attrs = {'name': 'user', 'password': usr.password} + with patch.object(self.admin, self.__N_RU) as rename: + with patch.object(self.admin, self.__N_AUP) as alter: + self.manager.update_attributes(self.context, usr.name, + None, usr_attrs) + rename.assert_called_once_with(ANY, usr, usr_attrs['name'], + usr_attrs['password']) + self.assertEqual(0, alter.call_count) + + usr_attrs = {'name': 'user'} + with patch.object(self.admin, self.__N_RU) as rename: + with patch.object(self.admin, self.__N_AUP) as alter: + with ExpectedException( + exception.UnprocessableEntity, "Updating username " + "requires specifying a password as well."): + self.manager.update_attributes(self.context, usr.name, + None, usr_attrs) + self.assertEqual(0, rename.call_count) + self.assertEqual(0, alter.call_count) + + usr_attrs = {'name': usr.name, 'password': 'password'} + with patch.object(self.admin, self.__N_RU) as rename: + with patch.object(self.admin, self.__N_AUP) as alter: + self.manager.update_attributes(self.context, usr.name, + None, usr_attrs) + alter.assert_called_once_with(ANY, usr) + self.assertEqual(0, rename.call_count) + + usr_attrs = {'password': usr.password} + with patch.object(self.admin, self.__N_RU) as rename: + with patch.object(self.admin, self.__N_AUP) as alter: + self.manager.update_attributes(self.context, usr.name, + None, usr_attrs) + self.assertEqual(0, rename.call_count) + self.assertEqual(0, alter.call_count) + + usr_attrs = {'password': 'trove'} + with patch.object(self.admin, self.__N_RU) as rename: + with patch.object(self.admin, self.__N_AUP) as alter: + self.manager.update_attributes(self.context, usr.name, + None, usr_attrs) + alter.assert_called_once_with(ANY, usr) + self.assertEqual(0, rename.call_count) diff --git a/trove/tests/unittests/guestagent/test_dbaas.py b/trove/tests/unittests/guestagent/test_dbaas.py index 37194e891d..4a9f3e9941 100644 --- a/trove/tests/unittests/guestagent/test_dbaas.py +++ b/trove/tests/unittests/guestagent/test_dbaas.py @@ -45,8 +45,6 @@ from trove.guestagent.common.operating_system import FileMode from trove.guestagent.common import sql_query from trove.guestagent.datastore.experimental.cassandra import ( service as cass_service) -from trove.guestagent.datastore.experimental.cassandra import ( - system as cass_system) from trove.guestagent.datastore.experimental.couchbase import ( service as couchservice) from trove.guestagent.datastore.experimental.couchdb import ( @@ -2354,17 +2352,14 @@ class CassandraDBAppTest(BaseAppTest.AppTestCase): def setUp(self): super(CassandraDBAppTest, self).setUp(str(uuid4())) - self.exec_patch = patch.object(utils, 'execute_with_timeout') - self.addCleanup(self.exec_patch.stop) - self.exec_mock = self.exec_patch.start() self.sleep = time.sleep self.orig_time_time = time.time self.pkg_version = cass_service.packager.pkg_version self.pkg = cass_service.packager util.init_db() - status = FakeAppStatus(self.FAKE_ID, - rd_instance.ServiceStatuses.NEW) - self.cassandra = cass_service.CassandraApp(status) + self.cassandra = cass_service.CassandraApp() + self.cassandra.status = FakeAppStatus(self.FAKE_ID, + rd_instance.ServiceStatuses.NEW) self.orig_unlink = os.unlink @property @@ -2381,14 +2376,14 @@ class CassandraDBAppTest(BaseAppTest.AppTestCase): @property def expected_service_candidates(self): - return cass_system.SERVICE_CANDIDATES + return self.cassandra.service_candidates def tearDown(self): - super(CassandraDBAppTest, self).tearDown() time.sleep = self.sleep time.time = self.orig_time_time cass_service.packager.pkg_version = self.pkg_version cass_service.packager = self.pkg + super(CassandraDBAppTest, self).tearDown() def assert_reported_status(self, expected_status): service_status = InstanceServiceStatus.find_by( @@ -2397,10 +2392,9 @@ class CassandraDBAppTest(BaseAppTest.AppTestCase): @patch.object(utils, 'execute_with_timeout') def test_service_cleanup(self, exec_mock): - cass_service.CassandraAppStatus().cleanup_stalled_db_services() - exec_mock.assert_called_once_with( - cass_system.CASSANDRA_KILL, - shell=True) + cass_service.CassandraAppStatus(Mock()).cleanup_stalled_db_services() + exec_mock.assert_called_once_with(self.cassandra.CASSANDRA_KILL_CMD, + shell=True) def test_install(self): @@ -2424,72 +2418,24 @@ class CassandraDBAppTest(BaseAppTest.AppTestCase): self.assert_reported_status(rd_instance.ServiceStatuses.NEW) - @patch('trove.guestagent.datastore.experimental.cassandra.service.LOG') - def test_cassandra_error_in_write_config_verify_unlink(self, *args): - # this test verifies not only that the write_config - # method properly invoked execute, but also that it properly - # attempted to unlink the file (as a result of the exception) - - mock_unlink = Mock(return_value=0) - - # We call tempfile.mkstemp() here and Mock() the mkstemp() - # parameter to write_config for testability. - (temp_handle, temp_config_name) = tempfile.mkstemp() - mock_mkstemp = MagicMock(return_value=(temp_handle, temp_config_name)) - - configuration = 'this is my configuration' - - with patch('trove.guestagent.common.operating_system.move', - side_effect=ProcessExecutionError('some exception')): - self.assertRaises(ProcessExecutionError, - self.cassandra.write_config, - config_contents=configuration, - execute_function=Mock(), - mkstemp_function=mock_mkstemp, - unlink_function=mock_unlink) - - self.assertEqual(1, mock_unlink.call_count) - - # really delete the temporary_config_file - os.unlink(temp_config_name) - @patch.multiple('trove.guestagent.common.operating_system', - chown=DEFAULT, chmod=DEFAULT, move=DEFAULT) - def test_cassandra_write_config(self, chown, chmod, move): - # ensure that write_config creates a temporary file, and then - # moves the file to the final place. Also validate the - # contents of the file written. - - # We call tempfile.mkstemp() here and Mock() the mkstemp() - # parameter to write_config for testability. - (temp_handle, temp_config_name) = tempfile.mkstemp() - mock_mkstemp = MagicMock(return_value=(temp_handle, temp_config_name)) - + chown=DEFAULT, chmod=DEFAULT, write_file=DEFAULT) + def test_cassandra_write_config(self, chown, chmod, write_file): configuration = 'some arbitrary configuration text' + self.cassandra.write_config(configuration) - mock_execute = MagicMock(return_value=('', '')) - - self.cassandra.write_config(configuration, - execute_function=mock_execute, - mkstemp_function=mock_mkstemp) - - move.assert_called_with(temp_config_name, cass_system.CASSANDRA_CONF, - as_root=True) - chown.assert_called_with(cass_system.CASSANDRA_CONF, + write_file.assert_called_with( + self.cassandra.cassandra_conf, + configuration, + codec=ANY, + as_root=True) + chown.assert_called_with(self.cassandra.cassandra_conf, "cassandra", "cassandra", recursive=False, as_root=True) chmod.assert_called_with( - cass_system.CASSANDRA_CONF, FileMode.ADD_READ_ALL, as_root=True) - - self.assertEqual(1, mock_mkstemp.call_count) - - with open(temp_config_name, 'r') as config_file: - configuration_data = config_file.read() - - self.assertEqual(configuration, configuration_data) - - # really delete the temporary_config_file - os.unlink(temp_config_name) + self.cassandra.cassandra_conf, + FileMode.ADD_READ_ALL, + as_root=True) class CouchbaseAppTest(BaseAppTest.AppTestCase):