From 94456be8fc2cdd87715ab8d8dbbac1250255731f Mon Sep 17 00:00:00 2001 From: Lisa Zangrando Date: Wed, 19 Jul 2017 12:39:53 +0200 Subject: [PATCH] Partition Director requires configuring Synergy through RESTful API Partition Director, the INDIGO-DataCloud service, requires the setting of the Synergy configuration (e.g. the list of projects allowed the access the shared quota, their values, etc) through the RESTful API. This implies some changes at configuration level of Synergy: some parameters must be drop out from the synergy_scheduler.conf file and the stored into a database. The use of the database for storing configuration parameters will be useful even for configuring Synergy in high availability mode. Bug: #1690123 Change-Id: Id8f9c6b0e0a2804b43984f7353dc3fc0882cd651 Sem-Ver: feature --- config/synergy_scheduler.conf | 42 ++- setup.cfg | 2 + synergy_scheduler_manager/client/command.py | 183 +++++++++ .../fairshare_manager.py | 283 ++++++-------- synergy_scheduler_manager/project_manager.py | 354 ++++++++++++++++++ synergy_scheduler_manager/quota_manager.py | 222 ++++++----- .../scheduler_manager.py | 174 ++------- .../functional/test_fairshare_manager.py | 87 +---- .../functional/test_scheduler_manager.py | 40 +- 9 files changed, 860 insertions(+), 527 deletions(-) create mode 100644 synergy_scheduler_manager/project_manager.py diff --git a/config/synergy_scheduler.conf b/config/synergy_scheduler.conf index 837db04..e1878cf 100644 --- a/config/synergy_scheduler.conf +++ b/config/synergy_scheduler.conf @@ -7,21 +7,6 @@ autostart = True # set the manager rate (minutes) rate = 1 -# set the list of projects accessing to the shared quota -# projects = prj_a, prj_b -#projects = - -# set the projects share -# shares = prj_a=70, prj_b=30 -#shares = - -# set the default max time to live (minutes) for VM/Container -default_TTL = 2880 - -# set, for the specified projects, the max time to live (minutes) for VM/Container -# TTLs = prj_a=1440, prj_b=2880 -#TTLs = - # set the max depth used by the backfilling strategy (default: 100) # this allows Synergy to not check the whole queue when looking for VMs to start backfill_depth = 100 @@ -200,3 +185,30 @@ autostart = True # set the manager rate (minutes) rate = 5 + + +[ProjectManager] +autostart = True + +# set the manager rate (minutes) +rate = 60 + +# set the Synergy database connection: +db_connection = DIALECT+DRIVER://USER:PASSWORD@DB_HOST/synergy + +# set the connection pool size (default: 10) +db_pool_size = 10 + +# set the number of seconds after which a connection is automatically +# recycled (default: 30) +db_pool_recycle = 30 + +# set the max overflow (default: 5) +db_max_overflow = 5 + +# set the default max time to live (minutes) for VM/Container +default_TTL = 2880 + +# set the default share value (default: 10) +default_share = 10 + diff --git a/setup.cfg b/setup.cfg index 188914b..a5bd87d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,8 +27,10 @@ synergy.managers = QuotaManager = synergy_scheduler_manager.quota_manager:QuotaManager FairShareManager = synergy_scheduler_manager.fairshare_manager:FairShareManager SchedulerManager = synergy_scheduler_manager.scheduler_manager:SchedulerManager + ProjectManager = synergy_scheduler_manager.project_manager:ProjectManager synergy.commands = + project = synergy_scheduler_manager.client.command:ProjectCommand quota = synergy_scheduler_manager.client.command:QuotaCommand queue = synergy_scheduler_manager.client.command:QueueCommand usage = synergy_scheduler_manager.client.command:UsageCommand diff --git a/synergy_scheduler_manager/client/command.py b/synergy_scheduler_manager/client/command.py index fa54950..ffedd8c 100644 --- a/synergy_scheduler_manager/client/command.py +++ b/synergy_scheduler_manager/client/command.py @@ -24,6 +24,189 @@ See the License for the specific language governing permissions and limitations under the License.""" +class ProjectCommand(ExecuteCommand): + + def __init__(self): + super(ProjectCommand, self).__init__("ProjectCommand") + + def configureParser(self, subparser): + prj_parser = subparser.add_parser('project') + prj_subparsers = prj_parser.add_subparsers(dest="command") + prj_subparsers.add_parser("list", add_help=True, + help="shows the projects list") + + show_parser = prj_subparsers.add_parser("show", add_help=True, + help="shows the project info") + group = show_parser.add_mutually_exclusive_group(required=True) + group.add_argument("-i", "--id", metavar="") + group.add_argument("-n", "--name", metavar="") + group.add_argument("-a", "--all", action="store_true") + show_parser.add_argument("-r", "--share", action="store_true") + show_parser.add_argument("-t", "--ttl", action="store_true") + show_parser.add_argument("-p", "--p_quota", action="store_true") + show_parser.add_argument("-s", "--s_quota", action="store_true") + show_parser.add_argument("-q", "--queue", action="store_true") + show_parser.add_argument("-l", "--long", action="store_true") + show_parser.add_argument("-u", "--usage", action="store_true") + + add_parser = prj_subparsers.add_parser("add", add_help=True, + help="adds a new project") + group = add_parser.add_mutually_exclusive_group(required=True) + group.add_argument("-i", "--id", metavar="") + group.add_argument("-n", "--name", metavar="") + add_parser.add_argument("-s", "--share", metavar="") + add_parser.add_argument("-t", "--ttl", metavar="") + + remove_parser = prj_subparsers.add_parser("remove", add_help=True, + help="removes a project") + group = remove_parser.add_mutually_exclusive_group(required=True) + group.add_argument("-i", "--id", metavar="") + group.add_argument("-n", "--name", metavar="") + + set_parser = prj_subparsers.add_parser("set", add_help=True, + help="sets the project values") + group = set_parser.add_mutually_exclusive_group(required=True) + group.add_argument("-i", "--id", metavar="") + group.add_argument("-n", "--name", metavar="") + set_parser.add_argument("-s", "--share", metavar="") + set_parser.add_argument("-t", "--ttl", metavar="") + + def execute(self, synergy_url, args): + id = getattr(args, 'id', None) + name = getattr(args, 'name', None) + command = getattr(args, 'command', None) + headers = ["name"] + + if command == "list": + cmd_args = {} + command = "GET_PROJECTS" + + elif command == "show": + if args.all: + cmd_args = {} + command = "GET_PROJECTS" + else: + cmd_args = {"id": id, "name": name} + command = "GET_PROJECT" + + if args.long: + headers.insert(0, "id") + if args.usage: + headers.append("usage") + if args.p_quota: + headers.append("private quota") + if args.s_quota: + headers.append("shared quota") + if args.queue: + headers.append("queue") + if args.share: + headers.append("share") + if args.ttl: + headers.append("TTL") + + elif command == "remove": + cmd_args = {"id": id, "name": name} + command = "REMOVE_PROJECT" + + else: + cmd_args = {"id": id, "name": name} + + TTL = getattr(args, 'ttl', None) + share = getattr(args, 'share', None) + + if TTL: + cmd_args["TTL"] = TTL + + if share: + cmd_args["share"] = share + + if command == "add": + command = "ADD_PROJECT" + headers.append("share") + headers.append("TTL") + elif command == "set": + command = "UPDATE_PROJECT" + + if TTL: + headers.append("TTL") + + if share: + headers.append("share") + + result = super(ProjectCommand, self).execute(synergy_url, + "ProjectManager", + command, + args=cmd_args) + + if isinstance(result, Project): + self.printProjects([result], headers) + else: + self.printProjects(result, headers) + + def printProjects(self, projects, headers): + if not projects: + return + + table = [] + + for project in projects: + row = [] + for attribute in headers: + if attribute == "id": + row.append(project.getId()) + + if attribute == "name": + row.append(project.getName()) + + if attribute == "share": + share = project.getShare() + share_value = share.getValue() + share_norm = share.getNormalizedValue() + row.append("{:.2f}% | {:.2f}%".format(share_value, + share_norm * 100)) + + if attribute == "TTL": + row.append(project.getTTL()) + + if attribute == "private quota": + quota = project.getQuota() + + private = "vcpus: {:.1f} of {:.1f} | ram: "\ + "{:.1f} of {:.1f}".format(quota.getUsage("vcpus"), + quota.getSize("vcpus"), + quota.getUsage("memory"), + quota.getSize("memory")) + + row.append(private) + + if attribute == "shared quota": + quota = project.getQuota() + + vcpus_size = quota.getSize("vcpus", private=False) + vcpus_usage = quota.getUsage("vcpus", private=False) + memory_size = quota.getSize("memory", private=False) + memory_usage = quota.getUsage("memory", private=False) + + shared = "vcpus: {:.1f} of {:.1f} | "\ + "ram: {:.1f} of {:.1f}".format( + vcpus_usage, vcpus_size, + memory_usage, memory_size) + + row.append(shared) + + if attribute == "usage": + data = project.getData() + + usage = "vcpus: {:.2f}% | ram: {:.2f}%".format( + data["effective_vcpus"] * 100, + data["effective_memory"] * 100) + + row.append(usage) + table.append(row) + + print(tabulate(table, headers, tablefmt="fancy_grid")) + + class QueueCommand(ExecuteCommand): def __init__(self): diff --git a/synergy_scheduler_manager/fairshare_manager.py b/synergy_scheduler_manager/fairshare_manager.py index 174cb0f..031fccd 100644 --- a/synergy_scheduler_manager/fairshare_manager.py +++ b/synergy_scheduler_manager/fairshare_manager.py @@ -1,10 +1,11 @@ import logging -import threading from datetime import datetime from datetime import timedelta from oslo_config import cfg from synergy.common.manager import Manager +from synergy.exception import SynergyError + __author__ = "Lisa Zangrando" __email__ = "lisa.zangrando[AT]pd.infn.it" @@ -47,16 +48,13 @@ class FairShareManager(Manager): def setup(self): if self.getManager("NovaManager") is None: - raise Exception("NovaManager not found!") + raise SynergyError("NovaManager not found!") - if self.getManager("QueueManager") is None: - raise Exception("QueueManager not found!") + if self.getManager("ProjectManager") is None: + raise SynergyError("ProjectManager not found!") - if self.getManager("QuotaManager") is None: - raise Exception("QuotaManager not found!") - - if self.getManager("KeystoneManager") is None: - raise Exception("KeystoneManager not found!") + self.nova_manager = self.getManager("NovaManager") + self.project_manager = self.getManager("ProjectManager") self.periods = CONF.FairShareManager.periods self.period_length = CONF.FairShareManager.period_length @@ -65,68 +63,32 @@ class FairShareManager(Manager): self.vcpus_weight = CONF.FairShareManager.vcpus_weight self.age_weight = CONF.FairShareManager.age_weight self.memory_weight = CONF.FairShareManager.memory_weight - self.projects = {} - self.workers = [] - self.exit = False - self.nova_manager = self.getManager("NovaManager") - self.queue_manager = self.getManager("QueueManager") - self.quota_manager = self.getManager("QuotaManager") - self.keystone_manager = self.getManager("KeystoneManager") - self.fs_condition = threading.Condition() if self.decay_weight < 0: self.decay_weight = float(0) elif self.decay_weight > 1: self.decay_weight = float(1) - def execute(self, command, *args, **kargs): - if command == "ADD_PROJECT": - return self.addProject(*args, **kargs) - elif command == "GET_PROJECT": - return self.getProject(*args, **kargs) - elif command == "GET_PROJECTS": - return self.getProjects() - elif command == "REMOVE_PROJECT": - return self.removeProject(*args, **kargs) - elif command == "GET_PRIORITY": - result = {} - for prj_id, project in self.projects.items(): - users = {} - - for user_id, user in project["users"].items(): - p = self.calculatePriority(user_id=user_id, prj_id=prj_id) - users[user["name"]] = p - - result[project["name"]] = users - return result - elif command == "CALCULATE_PRIORITY": - return self.calculatePriority(*args, **kargs) - elif command == "CALCULATE_FAIRSHARE": - return self.calculateFairShare(*args, **kargs) - else: - raise Exception("command=%r not supported!" % command) - def task(self): - with self.fs_condition: - try: - self.checkUsers() - self.calculateFairShare() - except Exception as ex: - LOG.error(ex) - raise ex - finally: - self.fs_condition.notifyAll() + try: + self._calculateFairShare() + except SynergyError as ex: + LOG.error(ex) + raise ex def destroy(self): pass def calculatePriority(self, user_id, prj_id, timestamp=None, retry=0): - if prj_id not in self.projects: - raise Exception("project=%s not found!" % prj_id) + project = self.project_manager.getProject(id=prj_id) + + if not project: + raise SynergyError("project=%s not found!" % prj_id) + + user = project.getUser(id=user_id) - user = self.projects[prj_id].getUser(id=user_id) if not user: - raise Exception("user=%s not found!" % user_id) + raise SynergyError("user=%s not found!" % user_id) priority = user.getPriority() fairshare_vcpus = priority.getFairShare("vcpus") @@ -146,117 +108,45 @@ class FairShareManager(Manager): return int(priority) - def addProject(self, project): - if self.projects.get(project.getId(), None): - raise Exception("project %s already exists!" % (project.getId())) + def doOnEvent(self, event_type, *args, **kwargs): + if event_type == "USER_ADDED": + user = kwargs.get("user", None) - prj_share = project.getShare() - if prj_share.getValue() == 0: - prj_share.setValue(self.default_share) + if not user: + return - self.projects[project.getId()] = project + share = user.getShare() - def getProject(self, prj_id): - if prj_id not in self.projects: - raise Exception("project name=%r not found!" % prj_id) + if not share or share.getValue() == 0: + share.setValue(self.default_share) - return self.projects.get(prj_id, None) + elif event_type == "PROJECT_ADDED": + project = kwargs.get("project", None) - def getProjects(self): - return self.projects + if not project: + return - def removeProject(self, prj_id): - if prj_id in self.projects: - with self.fs_condition: - del self.projects[prj_id] - self.fs_condition.notifyAll() + share = project.getShare() - def checkUsers(self): - if not self.projects: + if not share or share.getValue() == 0: + share.setValue(self.default_share) + + elif event_type == "PROJECT_REMOVED" or\ + event_type == "PROJECT_UPDATED": + pass + else: return - for project in self.projects.values(): - k_users = self.keystone_manager.getUsers(prj_id=project.getId()) - k_users_ids = [user.getId() for user in k_users] - - p_users = project.getUsers() - p_users_ids = [user.getId() for user in p_users] - - new_user_ids = list(set(k_users_ids) - set(p_users_ids)) - - deleted_user_ids = list(set(p_users_ids) - set(k_users_ids)) - for id in deleted_user_ids: - LOG.info("deleting user %s" % id) - project.removeUser(id) - - for user in k_users: - if user.getId() in new_user_ids: - LOG.info("found new user %s" % user.getName()) - date = datetime.utcnow() - - data = user.getData() - data["vcpus"] = float(0) - data["memory"] = float(0) - data["actual_memory"] = float(0) - data["actual_vcpus"] = float(0) - data["time_window_from_date"] = date - data["time_window_to_date"] = date - - try: - project.addUser(user) - except Exception: - pass - - def calculateFairShare(self): - if not self.projects: - return + self._calculateShares() + self._calculateFairShare() + def _calculateShares(self): total_prj_share = float(0) - total_memory = float(0) - total_vcpus = float(0) + projects = self.project_manager.getProjects() - to_date = datetime.utcnow() - - time_window_from_date = to_date - time_window_to_date = to_date - - for prj_id, project in self.projects.items(): - for user in project.getUsers(): - data = user.getData() - data["vcpus"] = float(0) - data["memory"] = float(0) - - for period in xrange(self.periods): - decay = self.decay_weight ** period - from_date = to_date - timedelta(days=(self.period_length)) - time_window_from_date = from_date - - for prj_id, project in self.projects.items(): - usages = self.nova_manager.getProjectUsage( - prj_id, from_date, to_date) - - for user_id, usage_rec in usages.items(): - decay_vcpus = decay * usage_rec["vcpus"] - decay_memory = decay * usage_rec["memory"] - - user = project.getUser(id=user_id) - - if user: - data = user.getData() - data["vcpus"] += decay_vcpus - data["memory"] += decay_memory - - total_vcpus += decay_vcpus - total_memory += decay_memory - - to_date = from_date - - for project in self.projects.values(): + for project in projects.values(): prj_share = project.getShare() - if prj_share.getValue() == 0: - prj_share.setValue(self.default_share) - # check the share for each user and update the usage_record sibling_share = float(0) @@ -278,15 +168,7 @@ class FairShareManager(Manager): total_prj_share += prj_share.getValue() - for prj_id, project in self.projects.items(): - prj_data = project.getData() - prj_data["actual_memory"] = float(0) - prj_data["actual_vcpus"] = float(0) - prj_data["effective_memory"] = float(0) - prj_data["effective_vcpus"] = float(0) - prj_data["time_window_from_date"] = time_window_from_date - prj_data["time_window_to_date"] = time_window_to_date - + for project in projects.values(): prj_share = project.getShare() prj_share.setSiblingValue(total_prj_share) prj_share.setNormalizedValue( @@ -300,10 +182,75 @@ class FairShareManager(Manager): usr_share.getValue() / usr_share.getSiblingValue() * prj_share.getNormalizedValue()) + def _calculateFairShare(self): + projects = self.project_manager.getProjects() + + if not projects: + return + + total_memory = float(0) + total_vcpus = float(0) + + to_date = datetime.utcnow() + + time_window_from_date = to_date + time_window_to_date = to_date + + for prj_id, project in projects.items(): + prj_data = project.getData() + prj_data["actual_vcpus"] = float(0) + prj_data["actual_memory"] = float(0) + prj_data["effective_vcpus"] = float(0) + prj_data["effective_memory"] = float(0) + prj_data["time_window_from_date"] = time_window_from_date + prj_data["time_window_to_date"] = time_window_to_date + + for user in project.getUsers(): + usr_data = user.getData() + usr_data["vcpus"] = float(0) + usr_data["memory"] = float(0) + usr_data["actual_vcpus"] = float(0) + usr_data["actual_memory"] = float(0) + usr_data["effective_vcpus"] = float(0) + usr_data["effective_memory"] = float(0) + usr_data["actual_rel_vcpus"] = float(0) + usr_data["actual_rel_memory"] = float(0) + usr_data["time_window_from_date"] = time_window_from_date + usr_data["time_window_to_date"] = time_window_to_date + + for period in xrange(self.periods): + decay = self.decay_weight ** period + from_date = to_date - timedelta(days=(self.period_length)) + time_window_from_date = from_date + + for prj_id, project in projects.items(): + usages = self.nova_manager.getProjectUsage( + prj_id, from_date, to_date) + + for user_id, usage_rec in usages.items(): + decay_vcpus = decay * usage_rec["vcpus"] + decay_memory = decay * usage_rec["memory"] + + user = project.getUser(id=user_id) + + if user: + data = user.getData() + data["vcpus"] += decay_vcpus + data["memory"] += decay_memory + + total_vcpus += decay_vcpus + total_memory += decay_memory + + to_date = from_date + + for prj_id, project in projects.items(): + prj_data = project.getData() + prj_data["time_window_to_date"] = time_window_to_date + + for user in project.getUsers(): usr_data = user.getData() usr_data["actual_memory"] = usr_data["memory"] usr_data["actual_vcpus"] = usr_data["vcpus"] - usr_data["time_window_from_date"] = time_window_from_date usr_data["time_window_to_date"] = time_window_to_date if total_memory > 0: @@ -315,7 +262,7 @@ class FairShareManager(Manager): prj_data["actual_memory"] += usr_data["actual_memory"] prj_data["actual_vcpus"] += usr_data["actual_vcpus"] - for project in self.projects.values(): + for project in projects.values(): prj_data = project.getData() prj_data["effective_memory"] = prj_data["actual_memory"] prj_data["effective_vcpus"] = prj_data["actual_vcpus"] @@ -327,10 +274,6 @@ class FairShareManager(Manager): sibling_share = usr_share.getSiblingValue() norm_share = usr_share.getNormalizedValue() usr_data = user.getData() - usr_data["effective_vcpus"] = float(0) - usr_data["effective_memory"] = float(0) - usr_data["actual_rel_vcpus"] = float(0) - usr_data["actual_rel_memory"] = float(0) if prj_data["actual_vcpus"] > 0: usr_data["actual_rel_vcpus"] = usr_data["actual_vcpus"] @@ -361,5 +304,3 @@ class FairShareManager(Manager): usr_priority.setValue(float(self.vcpus_weight) * f_vcpus + float(self.memory_weight) * f_memory) - - LOG.debug("fairshare project %s" % project.serialize()) diff --git a/synergy_scheduler_manager/project_manager.py b/synergy_scheduler_manager/project_manager.py new file mode 100644 index 0000000..3c7e734 --- /dev/null +++ b/synergy_scheduler_manager/project_manager.py @@ -0,0 +1,354 @@ +import logging + +from common.project import Project +from oslo_config import cfg +from sqlalchemy import create_engine +from sqlalchemy.exc import SQLAlchemyError +from synergy.common.manager import Manager +from synergy.exception import SynergyError + + +__author__ = "Lisa Zangrando" +__email__ = "lisa.zangrando[AT]pd.infn.it" +__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud +All Rights Reserved + +Licensed under the Apache License, Version 2.0; +you may not use this file except in compliance with the +License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. +See the License for the specific language governing +permissions and limitations under the License.""" + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class ProjectManager(Manager): + + def __init__(self): + super(ProjectManager, self).__init__(name="ProjectManager") + + self.config_opts = [ + cfg.IntOpt("default_TTL", default=1440, required=False), + cfg.FloatOpt("default_share", default=10.0, required=False), + cfg.StrOpt("db_connection", help="the DB url", required=True), + cfg.IntOpt('db_pool_size', default=10, required=False), + cfg.IntOpt('db_pool_recycle', default=30, required=False), + cfg.IntOpt('db_max_overflow', default=5, required=False) + ] + + self.projects = {} + + def setup(self): + self.default_TTL = CONF.ProjectManager.default_TTL + self.default_share = CONF.ProjectManager.default_share + + db_connection = CONF.ProjectManager.db_connection + pool_size = CONF.ProjectManager.db_pool_size + pool_recycle = CONF.ProjectManager.db_pool_recycle + max_overflow = CONF.ProjectManager.db_max_overflow + + try: + self.db_engine = create_engine(db_connection, + pool_size=pool_size, + pool_recycle=pool_recycle, + max_overflow=max_overflow) + except SQLAlchemyError as ex: + LOG.error(ex) + raise ex + + self.configured = False + self.keystone_manager = self.getManager("KeystoneManager") + self.createTable() + + def task(self): + if not self.configured: + self.buildFromDB() + self.configured = True + + def destroy(self): + pass + + def execute(self, command, *args, **kargs): + if command == "GET_PROJECTS": + return self.projects.values() + + prj_id = kargs.get("id", None) + prj_name = kargs.get("name", None) + + project = self.getProject(prj_id, prj_name) + + if command == "GET_PROJECT": + if project: + return project + else: + raise SynergyError("project not found!") + + elif command == "ADD_PROJECT": + if project: + raise SynergyError("the project id=%s name=%s already exists!" + % (project.getId(), project.getName())) + + TTL = kargs.get("TTL", None) + share = kargs.get("share", None) + + return self._addProject(prj_id, prj_name, TTL, share) + + elif command == "UPDATE_PROJECT": + if not project: + raise SynergyError("project not found!") + + TTL = kargs.get("TTL", None) + share = kargs.get("share", None) + + return self._updateProject(project, TTL, share) + + elif command == "REMOVE_PROJECT": + if not project: + raise SynergyError("project not found!") + + self._removeProject(project) + else: + raise SynergyError("command %r not supported!" % command) + + def doOnEvent(self, event_type, *args, **kwargs): + if event_type == "identity.role_assignment.created": + usr_id = kwargs.get("user", None) + prj_id = kwargs.get("project", None) + project = self.getProject(id=prj_id) + + if project and not project.getUser(id=usr_id): + user = self.keystone_manager.getUser(usr_id) + if user: + project.addUser(user) + + self.notify(event_type="USER_ADDED", user=user) + + elif event_type == "identity.role_assignment.deleted": + usr_id = kwargs.get("user", None) + prj_id = kwargs.get("project", None) + project = self.getProject(id=prj_id) + + if project: + project.removeUser(usr_id) + self.notify(event_type="USER_REMOVED", user=user) + + elif event_type == "identity.user.deleted": + user_id = kwargs.get("resource_info", None) + + for project in self.projects.values(): + try: + user = project.getUser(id=user_id) + if user: + project.removeUser(user_id) + + self.notify(event_type="USER_DELETED", user=user) + except SynergyError as ex: + LOG.info(ex) + + def _parseNumber(self, value, default=None): + if not value: + return default + + try: + return int(value) + except SynergyError: + if default: + return default + raise SynergyError("%r is not a number!" % str(value)) + + def _addProject(self, prj_id, prj_name, TTL, share): + if prj_id: + project = self.keystone_manager.getProject(prj_id) + elif prj_name: + projects = self.keystone_manager.getProjects(name=prj_name) + + if len(projects) > 1: + raise SynergyError("ambiguity: found %s projects having %r" + " as name" % (len(projects), prj_name)) + if projects: + project = projects[0] + else: + raise SynergyError("missing project attributes") + + if not project: + raise SynergyError("project not found!") + + prj_TTL = self._parseNumber(TTL, default=self.default_TTL) + prj_share = self._parseNumber(share, 0) + + project.setTTL(prj_TTL) + project.getShare().setValue(prj_share) + + QUERY = "insert into project (id, name, share, TTL) " \ + "values (%s, %s, %s, %s)" + + connection = self.db_engine.connect() + trans = connection.begin() + + try: + connection.execute( + QUERY, [project.getId(), project.getName(), + project.getShare().getValue(), project.getTTL()]) + + trans.commit() + except SQLAlchemyError as ex: + trans.rollback() + + if "Duplicate entry" in ex.message: + raise SynergyError("the project id=%s name=%s already exists!" + % (project.getId(), project.getName())) + else: + raise(ex) + finally: + connection.close() + + self.projects[project.getId()] = project + + LOG.info("added project %s" % project.getName()) + self.notify(event_type="PROJECT_ADDED", project=project) + + return project + + def _updateProject(self, project, TTL, share): + if not project: + return + + TTL = self._parseNumber(TTL) + if TTL: + TTL = self._parseNumber(TTL) + if TTL <= 0: + raise SynergyError("wrong TTL value: %s <= 0" % TTL) + project.setTTL(TTL) + + share = self._parseNumber(share) + if share: + if share <= 0: + raise SynergyError("wrong share value: %s <= 0" % share) + project.getShare().setValue(share) + + connection = self.db_engine.connect() + trans = connection.begin() + + try: + QUERY = "update project set share=%s, TTL=%s where id=%s" + + connection.execute(QUERY, [project.getShare().getValue(), + project.getTTL(), + project.getId()]) + + trans.commit() + except SQLAlchemyError as ex: + trans.rollback() + + raise SynergyError(ex.message) + finally: + connection.close() + + self.notify(event_type="PROJECT_UPDATED", project=project) + + def _removeProject(self, project): + if project.getId() not in self.projects.keys(): + raise SynergyError("project %s not found!" % project.getId()) + + self.projects.pop(project.getId()) + + connection = self.db_engine.connect() + trans = connection.begin() + + try: + QUERY = "delete from project where id=%s" + + connection.execute(QUERY, [project.getId()]) + + trans.commit() + except SQLAlchemyError as ex: + trans.rollback() + + raise SynergyError(ex.message) + finally: + connection.close() + + LOG.info("removed project %s" % project.getName()) + self.notify(event_type="PROJECT_REMOVED", project=project) + + def getProject(self, id=None, name=None): + if not id and not name: + raise SynergyError("please define the project id or its name!") + + project = None + + if id: + project = self.projects.get(id, None) + elif name: + for prj in self.projects.values(): + if name == prj.getName(): + project = prj + break + + return project + + def getProjects(self): + return self.projects + + def createTable(self): + TABLE = """CREATE TABLE IF NOT EXISTS project (`id` VARCHAR(64) \ +NOT NULL PRIMARY KEY, name VARCHAR(64), share INT DEFAULT 0, TTL INT DEFAULT \ +1440) ENGINE=InnoDB""" + + connection = self.db_engine.connect() + + try: + connection.execute(TABLE) + except SQLAlchemyError as ex: + raise SynergyError(ex.message) + except Exception as ex: + raise SynergyError(ex.message) + finally: + connection.close() + + def buildFromDB(self): + connection = self.db_engine.connect() + + try: + QUERY = "select id, name, share, TTL from project" + result = connection.execute(QUERY) + + for row in result: + project = Project() + project.setId(row[0]) + project.setName(row[1]) + project.getShare().setValue(row[2]) + project.setTTL(row[3]) + project.setId(row[0]) + project_id = project.getId() + try: + k_project = self.keystone_manager.getProject(project_id) + + if not k_project: + self.removeProject(project) + continue + + users = self.keystone_manager.getUsers(prj_id=project_id) + + for user in users: + project.addUser(user) + + self.projects[project.getId()] = project + + self.notify(event_type="PROJECT_ADDED", project=project) + except SynergyError as ex: + LOG.info("the project %s seems not to exist anymore! " + "(reason=%s)" % (project.getName(), ex.message)) + self.removeProject(project) + except SQLAlchemyError as ex: + raise SynergyError(ex.message) + finally: + connection.close() diff --git a/synergy_scheduler_manager/quota_manager.py b/synergy_scheduler_manager/quota_manager.py index 750c628..ce04daf 100644 --- a/synergy_scheduler_manager/quota_manager.py +++ b/synergy_scheduler_manager/quota_manager.py @@ -4,6 +4,7 @@ import logging from common.quota import SharedQuota from oslo_config import cfg from synergy.common.manager import Manager +from synergy.exception import SynergyError __author__ = "Lisa Zangrando" @@ -39,152 +40,143 @@ class QuotaManager(Manager): self.projects = {} if self.getManager("NovaManager") is None: - raise Exception("NovaManager not found!") + raise SynergyError("NovaManager not found!") if self.getManager("KeystoneManager") is None: - raise Exception("KeystoneManager not found!") + raise SynergyError("KeystoneManager not found!") + + if self.getManager("ProjectManager") is None: + raise SynergyError("ProjectManager not found!") self.nova_manager = self.getManager("NovaManager") self.keystone_manager = self.getManager("KeystoneManager") - self.listener = None + self.project_manager = self.getManager("ProjectManager") def destroy(self): LOG.info("destroy invoked!") SharedQuota.disable() def execute(self, command, *args, **kargs): - if command == "show": - prj_id = kargs.get("project_id", None) - prj_name = kargs.get("project_name", None) - all_projects = kargs.get("all_projects", None) - - if prj_id: - project = self.projects.get(prj_id, None) + if command == "GET_PRIVATE_QUOTA": + prj_id = kargs.get("id", None) + prj_name = kargs.get("name", None) + project = self.project_manager.getProject(prj_id, prj_name) + if project: + return project.getQuota() + else: + raise SynergyError("project not found!") if project: return project - - raise Exception("project (id=%r) not found!" % prj_id) - elif prj_name: - for project in self.projects.values(): - if prj_name == project.getName(): - return project - - raise Exception("project (name=%r) not found!" % prj_name) - elif all_projects: - return self.projects.values() - else: - return SharedQuota() elif command == "GET_SHARED_QUOTA": return SharedQuota() - elif command == "GET_PROJECTS": - return self.projects.values() - elif command == "GET_PROJECT": - return self.getProject(*args, **kargs) else: - raise Exception("command=%r not supported!" % command) + raise SynergyError("command %r not supported!" % command) def task(self): try: self.updateSharedQuota() self.deleteExpiredServers() - except Exception as ex: + except SynergyError as ex: LOG.error(ex) - def getProject(self, prj_id): - return self.projects.get(prj_id, None) + def doOnEvent(self, event_type, *args, **kwargs): + if event_type == "PROJECT_ADDED": + project = kwargs.get("project", None) - def getProjects(self): - return self.projects + if not project: + return - def addProject(self, project): - if self.projects.get(project.getId(), None): - raise Exception("project %r already exists!" % (project.getId())) + try: + quota = self.nova_manager.getQuota(project.getId()) + + if quota.getSize("vcpus") > 0 and \ + quota.getSize("memory") > 0 and \ + quota.getSize("instances") > 0: + self.nova_manager.updateQuota(quota, is_class=True) + + quota.setSize("vcpus", -1) + quota.setSize("memory", -1) + quota.setSize("instances", -1) + + self.nova_manager.updateQuota(quota) + + class_quota = self.nova_manager.getQuota( + project.getId(), is_class=True) + + quota = project.getQuota() + quota.setId(project.getId()) + quota.setSize("vcpus", class_quota.getSize("vcpus")) + quota.setSize("memory", class_quota.getSize("memory")) + quota.setSize("instances", class_quota.getSize("instances")) + quota.setSize( + "vcpus", SharedQuota.getSize("vcpus"), private=False) + quota.setSize( + "memory", SharedQuota.getSize("memory"), private=False) + quota.setSize( + "instances", SharedQuota.getSize("instances"), + private=False) + + servers = self.nova_manager.getProjectServers(project.getId()) + + for server in servers: + if server.getState() != "building": + try: + quota.allocate(server) + except SynergyError as ex: + fl = server.getFlavor() + vcpus_size = quota.getSize("vcpus") + fl.getVCPUs() + mem_size = quota.getSize("memory") + fl.getMemory() + + quota.setSize("vcpus", vcpus_size) + quota.setSize("memory", mem_size) + + self.nova_manager.updateQuota(quota, is_class=True) + + LOG.warn("private quota autoresized (vcpus=%s, " + "memory=%s) for project %r (id=%s)" + % (quota.getSize("vcpus"), + quota.getSize("memory"), + project.getName(), + project.getId())) + quota.allocate(server) + + self.projects[project.getId()] = project + except SynergyError as ex: + LOG.error(ex) + raise ex + + elif event_type == "PROJECT_REMOVED": + project = kwargs.get("project", None) + + if not project: + return - try: quota = self.nova_manager.getQuota(project.getId()) - if quota.getSize("vcpus") > 0 and \ - quota.getSize("memory") > 0 and \ - quota.getSize("instances") > 0: + if quota.getSize("vcpus") <= -1 and \ + quota.getSize("memory") <= -1 and \ + quota.getSize("instances") <= -1: - self.nova_manager.updateQuota(quota, is_class=True) - - quota.setSize("vcpus", -1) - quota.setSize("memory", -1) - quota.setSize("instances", -1) - - self.nova_manager.updateQuota(quota) - - class_quota = self.nova_manager.getQuota( - project.getId(), is_class=True) + qc = self.nova_manager.getQuota(project.getId(), is_class=True) + self.nova_manager.updateQuota(qc) quota = project.getQuota() - quota.setId(project.getId()) - quota.setSize("vcpus", class_quota.getSize("vcpus")) - quota.setSize("memory", class_quota.getSize("memory")) - quota.setSize("instances", class_quota.getSize("instances")) - quota.setSize( - "vcpus", SharedQuota.getSize("vcpus"), private=False) - quota.setSize( - "memory", SharedQuota.getSize("memory"), private=False) - quota.setSize( - "instances", SharedQuota.getSize("instances"), private=False) - servers = self.nova_manager.getProjectServers(project.getId()) - - for server in servers: - if server.getState() != "building": - try: - quota.allocate(server) - except Exception as ex: - flavor = server.getFlavor() - vcpus_size = quota.getSize("vcpus") + flavor.getVCPUs() - mem_size = quota.getSize("memory") + flavor.getMemory() - - quota.setSize("vcpus", vcpus_size) - quota.setSize("memory", mem_size) - - self.nova_manager.updateQuota(quota, is_class=True) - - LOG.warn("private quota autoresized (vcpus=%s, " - "memory=%s) for project %r (id=%s)" - % (quota.getSize("vcpus"), - quota.getSize("memory"), - project.getName(), - project.getId())) - quota.allocate(server) - - self.projects[project.getId()] = project - except Exception as ex: - LOG.error(ex) - raise ex - - def removeProject(self, project, destroy=False): - project = self.projects[project.getId()] - - if project is None: - return - - try: - if destroy: - quota = project.getQuota() - - ids = [] - ids.extend(quota.getServers("active", private=False)) - ids.extend(quota.getServers("pending", private=False)) - ids.extend(quota.getServers("error", private=False)) + ids = [] + ids.extend(quota.getServers("active", private=False)) + ids.extend(quota.getServers("building", private=False)) + ids.extend(quota.getServers("error", private=False)) + try: for server_id in ids: self.nova_manager.deleteServer(server_id) - - del self.projects[project.getId()] - except Exception as ex: - LOG.error(ex) - raise ex + except SynergyError as ex: + LOG.error(ex) + raise ex def deleteExpiredServers(self): - for prj_id, project in self.getProjects().items(): + for prj_id, project in self.project_manager.getProjects().items(): TTL = project.getTTL() quota = project.getQuota() @@ -217,7 +209,7 @@ class QuotaManager(Manager): % (uuid, TTL, state, prj_id)) self.nova_manager.deleteServer(server) - except Exception as ex: + except SynergyError as ex: LOG.error(ex) raise ex @@ -252,7 +244,7 @@ class QuotaManager(Manager): domain = self.keystone_manager.getDomains(name="default") if not domain: - raise Exception("domain 'default' not found!") + raise SynergyError("domain 'default' not found!") domain = domain[0] dom_id = domain.getId() @@ -260,7 +252,7 @@ class QuotaManager(Manager): kprojects = self.keystone_manager.getProjects(domain_id=dom_id) for kproject in kprojects: - project = self.getProject(kproject.getId()) + project = self.project_manager.getProject(id=kproject.getId()) if project: quota = self.nova_manager.getQuota(project.getId(), @@ -298,7 +290,7 @@ class QuotaManager(Manager): enabled = False if total_vcpus < static_vcpus: - if self.getProjects(): + if self.project_manager.getProjects(): LOG.warn("shared quota: the total statically " "allocated vcpus (%s) is greater than the " "total amount of vcpus allowed (%s)" @@ -307,7 +299,7 @@ class QuotaManager(Manager): shared_vcpus = total_vcpus - static_vcpus if total_memory < static_memory: - if self.getProjects(): + if self.project_manager.getProjects(): LOG.warn("shared quota: the total statically " "allocated memory (%s) is greater than " "the total amount of memory allowed (%s)" @@ -330,10 +322,10 @@ class QuotaManager(Manager): SharedQuota.setSize("vcpus", 0) SharedQuota.setSize("memory", 0) - for project in self.getProjects().values(): + for project in self.project_manager.getProjects().values(): quota = project.getQuota() quota.setSize("vcpus", shared_vcpus, private=False) quota.setSize("memory", shared_memory, private=False) - except Exception as ex: + except SynergyError as ex: LOG.error(ex) raise ex diff --git a/synergy_scheduler_manager/scheduler_manager.py b/synergy_scheduler_manager/scheduler_manager.py index f9dc7f6..f3170f7 100644 --- a/synergy_scheduler_manager/scheduler_manager.py +++ b/synergy_scheduler_manager/scheduler_manager.py @@ -1,5 +1,4 @@ -import logging -import re +import logging from common.flavor import Flavor from common.quota import SharedQuota @@ -7,6 +6,7 @@ from common.request import Request from common.server import Server from oslo_config import cfg from synergy.common.manager import Manager +from synergy.exception import SynergyError from threading import Thread @@ -153,7 +153,7 @@ class Notifications(object): class Worker(Thread): - def __init__(self, name, queue, projects, nova_manager, + def __init__(self, name, queue, project_manager, nova_manager, keystone_manager, backfill_depth=100): super(Worker, self).__init__() self.setDaemon(True) @@ -161,7 +161,7 @@ class Worker(Thread): self.name = name self.backfill_depth = backfill_depth self.queue = queue - self.projects = projects + self.project_manager = project_manager self.nova_manager = nova_manager self.keystone_manager = keystone_manager self.exit = False @@ -175,7 +175,7 @@ class Worker(Thread): self.queue.close() self.exit = True - except Exception as ex: + except SynergyError as ex: LOG.error(ex) raise ex @@ -223,14 +223,19 @@ class Worker(Thread): # or server["OS-EXT-STS:task_state"] != "scheduling": self.queue.deleteItem(queue_item) continue - except Exception as ex: + except SynergyError as ex: LOG.warn("the server %s is not anymore available!" - "(reason=%s)" % (server_id, ex)) - self.queue.deleteItem(queue_item) + " (reason=%s)" % (server_id, ex)) + self.queue.delete(queue_item) continue - quota = self.projects[prj_id].getQuota() + project = self.project_manager.getProject(id=prj_id) + + if not project: + raise SynergyError("project %r not found!" % prj_id) + + quota = project.getQuota() blocking = False if server.isEphemeral() and not SharedQuota.isEnabled(): @@ -246,7 +251,7 @@ class Worker(Thread): context["auth_token"] = token.getId() context["user_id"] = token.getUser().getId() - except Exception as ex: + except SynergyError as ex: LOG.error("error on getting the token for server " "%s (reason=%s)" % (server.getId(), ex)) raise ex @@ -258,7 +263,7 @@ class Worker(Thread): "ta=shared)" % (server_id, user_id, prj_id)) found = True - except Exception as ex: + except SynergyError as ex: LOG.error("error on building the server %s (reason=%s)" % (server.getId(), ex)) @@ -270,7 +275,7 @@ class Worker(Thread): else: queue_items.append(queue_item) - except Exception as ex: + except SynergyError as ex: LOG.error("Exception has occured", exc_info=1) LOG.error("Worker %s: %s" % (self.name, ex)) @@ -287,35 +292,34 @@ class SchedulerManager(Manager): self.config_opts = [ cfg.StrOpt("notification_topic", default="notifications"), cfg.IntOpt("backfill_depth", default=100), - cfg.FloatOpt("default_TTL", default=10.0), - cfg.ListOpt("projects", default=[], help="the projects list"), - cfg.ListOpt("shares", default=[], help="the shares list"), - cfg.ListOpt("TTLs", default=[], help="the TTLs list") ] self.workers = [] def setup(self): if self.getManager("NovaManager") is None: - raise Exception("NovaManager not found!") + raise SynergyError("NovaManager not found!") if self.getManager("QueueManager") is None: - raise Exception("QueueManager not found!") + raise SynergyError("QueueManager not found!") if self.getManager("QuotaManager") is None: - raise Exception("QuotaManager not found!") + raise SynergyError("QuotaManager not found!") if self.getManager("KeystoneManager") is None: - raise Exception("KeystoneManager not found!") + raise SynergyError("KeystoneManager not found!") if self.getManager("FairShareManager") is None: - raise Exception("FairShareManager not found!") + raise SynergyError("FairShareManager not found!") + + if self.getManager("ProjectManager") is None: + raise SynergyError("ProjectManager not found!") self.nova_manager = self.getManager("NovaManager") self.queue_manager = self.getManager("QueueManager") self.quota_manager = self.getManager("QuotaManager") self.keystone_manager = self.getManager("KeystoneManager") self.fairshare_manager = self.getManager("FairShareManager") - self.default_TTL = float(CONF.SchedulerManager.default_TTL) + self.project_manager = self.getManager("ProjectManager") self.backfill_depth = CONF.SchedulerManager.backfill_depth self.notification_topic = CONF.SchedulerManager.notification_topic self.projects = {} @@ -323,129 +327,18 @@ class SchedulerManager(Manager): self.exit = False self.configured = False - def parseAttribute(self, attribute): - if attribute is None: - return None - - parsed_attribute = re.split('=', attribute) - - if len(parsed_attribute) > 1: - if not parsed_attribute[-1].isdigit(): - raise Exception("wrong value %r found in %r!" - % (parsed_attribute[-1], parsed_attribute)) - - if len(parsed_attribute) == 2: - prj_name = parsed_attribute[0] - value = float(parsed_attribute[1]) - else: - raise Exception("wrong attribute definition: %r" - % parsed_attribute) - else: - raise Exception("wrong attribute definition: %r" - % parsed_attribute) - - return (prj_name, value) - def execute(self, command, *args, **kargs): - if command == "show": - usr_id = kargs.get("user_id", None) - usr_name = kargs.get("user_name", None) - all_users = kargs.get("all_users", False) - all_projects = kargs.get("all_projects", False) - prj_id = kargs.get("project_id", None) - prj_name = kargs.get("project_name", None) - project = None - - if all_projects: - return self.projects.values() - - if (usr_id is not None or usr_name is not None or all_users) and \ - prj_id is None and prj_name is None: - raise Exception("project id or name not defined!") - - if prj_id: - project = self.projects.get(prj_id, None) - - if not project: - raise Exception("project %s not found!" % prj_id) - elif prj_name: - for prj in self.projects.values(): - if prj_name == prj.getName(): - project = prj - break - - if not project: - raise Exception("project %r not found!" % prj_name) - elif not all_users: - return self.projects.values() - - if usr_id or usr_name: - return project.getUser(id=usr_id, name=usr_name) - elif all_users: - return project.getUsers() - else: - return project - else: - raise Exception("command=%r not supported!" % command) + raise SynergyError("command %r not supported!" % command) def task(self): if self.configured: return - domain = self.keystone_manager.getDomains(name="default") - if not domain: - raise Exception("domain 'default' not found!") - - domain = domain[0] - dom_id = domain.getId() - - for project in self.keystone_manager.getProjects(domain_id=dom_id): - if project.getName() in CONF.SchedulerManager.projects: - CONF.SchedulerManager.projects.remove(project.getName()) - project.setTTL(self.default_TTL) - - self.projects[project.getName()] = project - else: - quota = self.nova_manager.getQuota(project.getId()) - - if quota.getSize("vcpus") <= -1 and \ - quota.getSize("memory") <= -1 and \ - quota.getSize("instances") <= -1: - - qc = self.nova_manager.getQuota(project.getId(), - is_class=True) - - self.nova_manager.updateQuota(qc) - - if len(CONF.SchedulerManager.projects) > 0: - raise Exception("projects %s not found, please check the syn" - "ergy.conf" % CONF.SchedulerManager.projects) - self.quota_manager.updateSharedQuota() - for prj_ttl in CONF.SchedulerManager.TTLs: - prj_name, TTL = self.parseAttribute(prj_ttl) - self.projects[prj_name].setTTL(TTL) - - for prj_share in CONF.SchedulerManager.shares: - prj_name, share_value = self.parseAttribute(prj_share) - p_share = self.projects[prj_name].getShare() - p_share.setValue(share_value) - - for prj_name, project in self.projects.items(): - del self.projects[prj_name] - self.projects[project.getId()] = project - - self.quota_manager.addProject(project) - self.fairshare_manager.addProject(project) - - self.quota_manager.updateSharedQuota() - self.fairshare_manager.checkUsers() - self.fairshare_manager.calculateFairShare() - try: self.dynamic_queue = self.queue_manager.createQueue("DYNAMIC") - except Exception as ex: + except SynergyError as ex: LOG.error("Exception has occured", exc_info=1) LOG.error(ex) @@ -453,7 +346,7 @@ class SchedulerManager(Manager): dynamic_worker = Worker("DYNAMIC", self.dynamic_queue, - self.projects, + self.project_manager, self.nova_manager, self.keystone_manager, self.backfill_depth) @@ -482,11 +375,10 @@ class SchedulerManager(Manager): def processRequest(self, request): server = request.getServer() - try: - if request.getProjectId() in self.projects: - self.nova_manager.setQuotaTypeServer(server) + project = self.project_manager.getProject(id=request.getProjectId()) - project = self.projects[request.getProjectId()] + try: + if project: quota = project.getQuota() retry = request.getRetry() num_attempts = 0 @@ -565,6 +457,6 @@ class SchedulerManager(Manager): priority)) else: self.nova_manager.buildServer(request) - except Exception as ex: + except SynergyError as ex: LOG.error("Exception has occured", exc_info=1) LOG.error(ex) diff --git a/synergy_scheduler_manager/tests/functional/test_fairshare_manager.py b/synergy_scheduler_manager/tests/functional/test_fairshare_manager.py index a7b7b2e..c7a297f 100755 --- a/synergy_scheduler_manager/tests/functional/test_fairshare_manager.py +++ b/synergy_scheduler_manager/tests/functional/test_fairshare_manager.py @@ -17,9 +17,7 @@ from mock import patch from synergy_scheduler_manager.common.project import Project from synergy_scheduler_manager.common.user import User from synergy_scheduler_manager.fairshare_manager import FairShareManager -from synergy_scheduler_manager.keystone_manager import KeystoneManager -from synergy_scheduler_manager.queue_manager import QueueManager -from synergy_scheduler_manager.quota_manager import QuotaManager +from synergy_scheduler_manager.project_manager import ProjectManager from synergy_scheduler_manager.tests.unit import base @@ -27,92 +25,32 @@ class TestFairshareManager(base.TestCase): def setUp(self): super(TestFairshareManager, self).setUp() - self.fsmanager = FairShareManager() + self.fairshare_manager = FairShareManager() + self.project_manager = ProjectManager() # NOTE(vincent): we cannot import NovaManager in our tests. # NovaManager depends on the "nova" package (not novaclient), but it is # not available on PyPI so the test runner will fail to install it. nova_manager_mock = MagicMock() - self.fsmanager.managers = { + self.fairshare_manager.managers = { 'NovaManager': nova_manager_mock(), - 'QueueManager': QueueManager(), - 'QuotaManager': QuotaManager(), - 'KeystoneManager': KeystoneManager()} + 'ProjectManager': self.project_manager} # Mock the configuration since it is initiliazed by synergy-service. with patch('synergy_scheduler_manager.fairshare_manager.CONF'): - self.fsmanager.setup() - - def test_add_project(self): - project = Project() - project.setId(1) - project.setName("test_project") - prj_share = project.getShare() - prj_share.setValue(5) - self.fsmanager.addProject(project) - - self.assertEqual(1, self.fsmanager.projects[1].getId()) - self.assertEqual("test_project", self.fsmanager.projects[1].getName()) - self.assertEqual([], self.fsmanager.projects[1].getUsers()) - self.assertEqual(5, self.fsmanager.projects[1].getShare().getValue()) - - def test_add_project_no_share(self): - project = Project() - project.setId(1) - project.setName("test_project") - self.fsmanager.addProject(project) - - self.assertEqual(1, self.fsmanager.projects[1].getId()) - self.assertEqual("test_project", self.fsmanager.projects[1].getName()) - self.assertEqual([], self.fsmanager.projects[1].getUsers()) - self.assertEqual(self.fsmanager.default_share, - self.fsmanager.projects[1].getShare().getValue()) - - def test_get_project(self): - project = Project() - project.setId(1) - project.setName("test_project") - self.fsmanager.addProject(project) - - self.assertEqual(project, self.fsmanager.getProject(1)) - - def test_get_projects(self): - project1 = Project() - project1.setId(1) - project1.setName("test1") - self.fsmanager.addProject(project1) - - project2 = Project() - project2.setId(2) - project2.setName("test2") - self.fsmanager.addProject(project2) - - expected_projects = { - 1: project1, - 2: project2} - self.assertEqual(expected_projects, self.fsmanager.getProjects()) - - def test_remove_project(self): - project = Project() - project.setId(1) - project.setName("test_project") - self.fsmanager.addProject(project) - - self.assertIn(1, self.fsmanager.projects) - self.fsmanager.removeProject(1) - self.assertNotIn(1, self.fsmanager.projects) + self.fairshare_manager.setup() def test_calculate_priority_one_user(self): - # self.fsmanager.addProject(prj_id=1, prj_name="test") + # self.fairshare_manager.addProject(prj_id=1, prj_name="test") project = Project() project.setId(1) project.setName("test_project") # Define values used for computing the priority - age_weight = self.fsmanager.age_weight = 1.0 - vcpus_weight = self.fsmanager.vcpus_weight = 2.0 - memory_weight = self.fsmanager.memory_weight = 3.0 + age_weight = self.fairshare_manager.age_weight = 1.0 + vcpus_weight = self.fairshare_manager.vcpus_weight = 2.0 + memory_weight = self.fairshare_manager.memory_weight = 3.0 datetime_start = datetime(year=2000, month=1, day=1, hour=0, minute=0) datetime_stop = datetime(year=2000, month=1, day=1, hour=2, minute=0) minutes = (datetime_stop - datetime_start).seconds / 60 @@ -128,7 +66,7 @@ class TestFairshareManager(base.TestCase): priority.setFairShare('memory', fairshare_ram) project.addUser(user) - self.fsmanager.addProject(project) + self.project_manager.projects[project.getId()] = project # Compute the expected priority given the previously defined values expected_priority = int(age_weight * minutes + @@ -138,7 +76,8 @@ class TestFairshareManager(base.TestCase): with patch("synergy_scheduler_manager.fairshare_manager.datetime") \ as datetime_mock: datetime_mock.utcnow.side_effect = (datetime_start, datetime_stop) - priority = self.fsmanager.calculatePriority(user_id=22, prj_id=1) + priority = self.fairshare_manager.calculatePriority( + user.getId(), project.getId()) self.assertEqual(expected_priority, priority) diff --git a/synergy_scheduler_manager/tests/functional/test_scheduler_manager.py b/synergy_scheduler_manager/tests/functional/test_scheduler_manager.py index 6924b7b..499ded6 100644 --- a/synergy_scheduler_manager/tests/functional/test_scheduler_manager.py +++ b/synergy_scheduler_manager/tests/functional/test_scheduler_manager.py @@ -19,6 +19,7 @@ from synergy_scheduler_manager.common.queue import QueueDB from synergy_scheduler_manager.common.queue import QueueItem from synergy_scheduler_manager.common.quota import SharedQuota from synergy_scheduler_manager.common.server import Server +from synergy_scheduler_manager.project_manager import ProjectManager from synergy_scheduler_manager.scheduler_manager import Notifications from synergy_scheduler_manager.scheduler_manager import Worker from synergy_scheduler_manager.tests.unit import base @@ -130,19 +131,36 @@ class TestWorker(base.TestCase): self.keystone_manager_mock = MagicMock() db_engine_mock = create_autospec(Engine) - project1 = Project() - project1.setId("1") - project1.setName("test1") + def my_side_effect(*args, **kwargs): + project1 = Project() + project1.setId(1) + project1.setName("test_project1") + project1.getShare().setValue(5) - project2 = Project() - project2.setId("2") - project2.setName("test2") + project2 = Project() + project2.setId(2) + project2.setName("test_project2") + project2.getShare().setValue(55) + + if args[0] == 1: + return project1 + elif args[0] == 2: + return project2 + + keystone_manager = MagicMock() + keystone_manager.getProject.side_effect = my_side_effect + + self.project_manager = ProjectManager() + self.project_manager.db_engine = MagicMock() + self.project_manager.keystone_manager = keystone_manager + self.project_manager.default_TTL = 10 + self.project_manager.default_share = 30 + self.project_manager._addProject(1, "test_project1", 10, 50) - projects_list = {'1': project1, '2': project2} self.worker = Worker( name="test", queue=QueueDB("testq", db_engine_mock), - projects=projects_list, + project_manager=self.project_manager, nova_manager=self.nova_manager_mock, keystone_manager=self.keystone_manager_mock) @@ -181,11 +199,11 @@ class TestWorker(base.TestCase): nova_exec = self.nova_manager_mock.execute nova_exec.side_effect = nova_exec_side_effect + project = self.project_manager.getProject(id=1) # Mock quota allocation - quota_allocate_mock = create_autospec( - self.worker.projects['1'].getQuota().allocate) + quota_allocate_mock = create_autospec(project.getQuota().allocate) quota_allocate_mock.return_value = True - self.worker.projects['1'].getQuota().allocate = quota_allocate_mock + project.getQuota().allocate = quota_allocate_mock # Delete item from the queue delete_item_mock = create_autospec(self.worker.queue.deleteItem)