diff --git a/config/synergy_scheduler.conf b/config/synergy_scheduler.conf index 837db04..e1878cf 100644 --- a/config/synergy_scheduler.conf +++ b/config/synergy_scheduler.conf @@ -7,21 +7,6 @@ autostart = True # set the manager rate (minutes) rate = 1 -# set the list of projects accessing to the shared quota -# projects = prj_a, prj_b -#projects = - -# set the projects share -# shares = prj_a=70, prj_b=30 -#shares = - -# set the default max time to live (minutes) for VM/Container -default_TTL = 2880 - -# set, for the specified projects, the max time to live (minutes) for VM/Container -# TTLs = prj_a=1440, prj_b=2880 -#TTLs = - # set the max depth used by the backfilling strategy (default: 100) # this allows Synergy to not check the whole queue when looking for VMs to start backfill_depth = 100 @@ -200,3 +185,30 @@ autostart = True # set the manager rate (minutes) rate = 5 + + +[ProjectManager] +autostart = True + +# set the manager rate (minutes) +rate = 60 + +# set the Synergy database connection: +db_connection = DIALECT+DRIVER://USER:PASSWORD@DB_HOST/synergy + +# set the connection pool size (default: 10) +db_pool_size = 10 + +# set the number of seconds after which a connection is automatically +# recycled (default: 30) +db_pool_recycle = 30 + +# set the max overflow (default: 5) +db_max_overflow = 5 + +# set the default max time to live (minutes) for VM/Container +default_TTL = 2880 + +# set the default share value (default: 10) +default_share = 10 + diff --git a/setup.cfg b/setup.cfg index 188914b..a5bd87d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,8 +27,10 @@ synergy.managers = QuotaManager = synergy_scheduler_manager.quota_manager:QuotaManager FairShareManager = synergy_scheduler_manager.fairshare_manager:FairShareManager SchedulerManager = synergy_scheduler_manager.scheduler_manager:SchedulerManager + ProjectManager = synergy_scheduler_manager.project_manager:ProjectManager synergy.commands = + project = synergy_scheduler_manager.client.command:ProjectCommand quota = synergy_scheduler_manager.client.command:QuotaCommand queue = synergy_scheduler_manager.client.command:QueueCommand usage = synergy_scheduler_manager.client.command:UsageCommand diff --git a/synergy_scheduler_manager/client/command.py b/synergy_scheduler_manager/client/command.py index fa54950..ffedd8c 100644 --- a/synergy_scheduler_manager/client/command.py +++ b/synergy_scheduler_manager/client/command.py @@ -24,6 +24,189 @@ See the License for the specific language governing permissions and limitations under the License.""" +class ProjectCommand(ExecuteCommand): + + def __init__(self): + super(ProjectCommand, self).__init__("ProjectCommand") + + def configureParser(self, subparser): + prj_parser = subparser.add_parser('project') + prj_subparsers = prj_parser.add_subparsers(dest="command") + prj_subparsers.add_parser("list", add_help=True, + help="shows the projects list") + + show_parser = prj_subparsers.add_parser("show", add_help=True, + help="shows the project info") + group = show_parser.add_mutually_exclusive_group(required=True) + group.add_argument("-i", "--id", metavar="") + group.add_argument("-n", "--name", metavar="") + group.add_argument("-a", "--all", action="store_true") + show_parser.add_argument("-r", "--share", action="store_true") + show_parser.add_argument("-t", "--ttl", action="store_true") + show_parser.add_argument("-p", "--p_quota", action="store_true") + show_parser.add_argument("-s", "--s_quota", action="store_true") + show_parser.add_argument("-q", "--queue", action="store_true") + show_parser.add_argument("-l", "--long", action="store_true") + show_parser.add_argument("-u", "--usage", action="store_true") + + add_parser = prj_subparsers.add_parser("add", add_help=True, + help="adds a new project") + group = add_parser.add_mutually_exclusive_group(required=True) + group.add_argument("-i", "--id", metavar="") + group.add_argument("-n", "--name", metavar="") + add_parser.add_argument("-s", "--share", metavar="") + add_parser.add_argument("-t", "--ttl", metavar="") + + remove_parser = prj_subparsers.add_parser("remove", add_help=True, + help="removes a project") + group = remove_parser.add_mutually_exclusive_group(required=True) + group.add_argument("-i", "--id", metavar="") + group.add_argument("-n", "--name", metavar="") + + set_parser = prj_subparsers.add_parser("set", add_help=True, + help="sets the project values") + group = set_parser.add_mutually_exclusive_group(required=True) + group.add_argument("-i", "--id", metavar="") + group.add_argument("-n", "--name", metavar="") + set_parser.add_argument("-s", "--share", metavar="") + set_parser.add_argument("-t", "--ttl", metavar="") + + def execute(self, synergy_url, args): + id = getattr(args, 'id', None) + name = getattr(args, 'name', None) + command = getattr(args, 'command', None) + headers = ["name"] + + if command == "list": + cmd_args = {} + command = "GET_PROJECTS" + + elif command == "show": + if args.all: + cmd_args = {} + command = "GET_PROJECTS" + else: + cmd_args = {"id": id, "name": name} + command = "GET_PROJECT" + + if args.long: + headers.insert(0, "id") + if args.usage: + headers.append("usage") + if args.p_quota: + headers.append("private quota") + if args.s_quota: + headers.append("shared quota") + if args.queue: + headers.append("queue") + if args.share: + headers.append("share") + if args.ttl: + headers.append("TTL") + + elif command == "remove": + cmd_args = {"id": id, "name": name} + command = "REMOVE_PROJECT" + + else: + cmd_args = {"id": id, "name": name} + + TTL = getattr(args, 'ttl', None) + share = getattr(args, 'share', None) + + if TTL: + cmd_args["TTL"] = TTL + + if share: + cmd_args["share"] = share + + if command == "add": + command = "ADD_PROJECT" + headers.append("share") + headers.append("TTL") + elif command == "set": + command = "UPDATE_PROJECT" + + if TTL: + headers.append("TTL") + + if share: + headers.append("share") + + result = super(ProjectCommand, self).execute(synergy_url, + "ProjectManager", + command, + args=cmd_args) + + if isinstance(result, Project): + self.printProjects([result], headers) + else: + self.printProjects(result, headers) + + def printProjects(self, projects, headers): + if not projects: + return + + table = [] + + for project in projects: + row = [] + for attribute in headers: + if attribute == "id": + row.append(project.getId()) + + if attribute == "name": + row.append(project.getName()) + + if attribute == "share": + share = project.getShare() + share_value = share.getValue() + share_norm = share.getNormalizedValue() + row.append("{:.2f}% | {:.2f}%".format(share_value, + share_norm * 100)) + + if attribute == "TTL": + row.append(project.getTTL()) + + if attribute == "private quota": + quota = project.getQuota() + + private = "vcpus: {:.1f} of {:.1f} | ram: "\ + "{:.1f} of {:.1f}".format(quota.getUsage("vcpus"), + quota.getSize("vcpus"), + quota.getUsage("memory"), + quota.getSize("memory")) + + row.append(private) + + if attribute == "shared quota": + quota = project.getQuota() + + vcpus_size = quota.getSize("vcpus", private=False) + vcpus_usage = quota.getUsage("vcpus", private=False) + memory_size = quota.getSize("memory", private=False) + memory_usage = quota.getUsage("memory", private=False) + + shared = "vcpus: {:.1f} of {:.1f} | "\ + "ram: {:.1f} of {:.1f}".format( + vcpus_usage, vcpus_size, + memory_usage, memory_size) + + row.append(shared) + + if attribute == "usage": + data = project.getData() + + usage = "vcpus: {:.2f}% | ram: {:.2f}%".format( + data["effective_vcpus"] * 100, + data["effective_memory"] * 100) + + row.append(usage) + table.append(row) + + print(tabulate(table, headers, tablefmt="fancy_grid")) + + class QueueCommand(ExecuteCommand): def __init__(self): diff --git a/synergy_scheduler_manager/fairshare_manager.py b/synergy_scheduler_manager/fairshare_manager.py index 174cb0f..031fccd 100644 --- a/synergy_scheduler_manager/fairshare_manager.py +++ b/synergy_scheduler_manager/fairshare_manager.py @@ -1,10 +1,11 @@ import logging -import threading from datetime import datetime from datetime import timedelta from oslo_config import cfg from synergy.common.manager import Manager +from synergy.exception import SynergyError + __author__ = "Lisa Zangrando" __email__ = "lisa.zangrando[AT]pd.infn.it" @@ -47,16 +48,13 @@ class FairShareManager(Manager): def setup(self): if self.getManager("NovaManager") is None: - raise Exception("NovaManager not found!") + raise SynergyError("NovaManager not found!") - if self.getManager("QueueManager") is None: - raise Exception("QueueManager not found!") + if self.getManager("ProjectManager") is None: + raise SynergyError("ProjectManager not found!") - if self.getManager("QuotaManager") is None: - raise Exception("QuotaManager not found!") - - if self.getManager("KeystoneManager") is None: - raise Exception("KeystoneManager not found!") + self.nova_manager = self.getManager("NovaManager") + self.project_manager = self.getManager("ProjectManager") self.periods = CONF.FairShareManager.periods self.period_length = CONF.FairShareManager.period_length @@ -65,68 +63,32 @@ class FairShareManager(Manager): self.vcpus_weight = CONF.FairShareManager.vcpus_weight self.age_weight = CONF.FairShareManager.age_weight self.memory_weight = CONF.FairShareManager.memory_weight - self.projects = {} - self.workers = [] - self.exit = False - self.nova_manager = self.getManager("NovaManager") - self.queue_manager = self.getManager("QueueManager") - self.quota_manager = self.getManager("QuotaManager") - self.keystone_manager = self.getManager("KeystoneManager") - self.fs_condition = threading.Condition() if self.decay_weight < 0: self.decay_weight = float(0) elif self.decay_weight > 1: self.decay_weight = float(1) - def execute(self, command, *args, **kargs): - if command == "ADD_PROJECT": - return self.addProject(*args, **kargs) - elif command == "GET_PROJECT": - return self.getProject(*args, **kargs) - elif command == "GET_PROJECTS": - return self.getProjects() - elif command == "REMOVE_PROJECT": - return self.removeProject(*args, **kargs) - elif command == "GET_PRIORITY": - result = {} - for prj_id, project in self.projects.items(): - users = {} - - for user_id, user in project["users"].items(): - p = self.calculatePriority(user_id=user_id, prj_id=prj_id) - users[user["name"]] = p - - result[project["name"]] = users - return result - elif command == "CALCULATE_PRIORITY": - return self.calculatePriority(*args, **kargs) - elif command == "CALCULATE_FAIRSHARE": - return self.calculateFairShare(*args, **kargs) - else: - raise Exception("command=%r not supported!" % command) - def task(self): - with self.fs_condition: - try: - self.checkUsers() - self.calculateFairShare() - except Exception as ex: - LOG.error(ex) - raise ex - finally: - self.fs_condition.notifyAll() + try: + self._calculateFairShare() + except SynergyError as ex: + LOG.error(ex) + raise ex def destroy(self): pass def calculatePriority(self, user_id, prj_id, timestamp=None, retry=0): - if prj_id not in self.projects: - raise Exception("project=%s not found!" % prj_id) + project = self.project_manager.getProject(id=prj_id) + + if not project: + raise SynergyError("project=%s not found!" % prj_id) + + user = project.getUser(id=user_id) - user = self.projects[prj_id].getUser(id=user_id) if not user: - raise Exception("user=%s not found!" % user_id) + raise SynergyError("user=%s not found!" % user_id) priority = user.getPriority() fairshare_vcpus = priority.getFairShare("vcpus") @@ -146,117 +108,45 @@ class FairShareManager(Manager): return int(priority) - def addProject(self, project): - if self.projects.get(project.getId(), None): - raise Exception("project %s already exists!" % (project.getId())) + def doOnEvent(self, event_type, *args, **kwargs): + if event_type == "USER_ADDED": + user = kwargs.get("user", None) - prj_share = project.getShare() - if prj_share.getValue() == 0: - prj_share.setValue(self.default_share) + if not user: + return - self.projects[project.getId()] = project + share = user.getShare() - def getProject(self, prj_id): - if prj_id not in self.projects: - raise Exception("project name=%r not found!" % prj_id) + if not share or share.getValue() == 0: + share.setValue(self.default_share) - return self.projects.get(prj_id, None) + elif event_type == "PROJECT_ADDED": + project = kwargs.get("project", None) - def getProjects(self): - return self.projects + if not project: + return - def removeProject(self, prj_id): - if prj_id in self.projects: - with self.fs_condition: - del self.projects[prj_id] - self.fs_condition.notifyAll() + share = project.getShare() - def checkUsers(self): - if not self.projects: + if not share or share.getValue() == 0: + share.setValue(self.default_share) + + elif event_type == "PROJECT_REMOVED" or\ + event_type == "PROJECT_UPDATED": + pass + else: return - for project in self.projects.values(): - k_users = self.keystone_manager.getUsers(prj_id=project.getId()) - k_users_ids = [user.getId() for user in k_users] - - p_users = project.getUsers() - p_users_ids = [user.getId() for user in p_users] - - new_user_ids = list(set(k_users_ids) - set(p_users_ids)) - - deleted_user_ids = list(set(p_users_ids) - set(k_users_ids)) - for id in deleted_user_ids: - LOG.info("deleting user %s" % id) - project.removeUser(id) - - for user in k_users: - if user.getId() in new_user_ids: - LOG.info("found new user %s" % user.getName()) - date = datetime.utcnow() - - data = user.getData() - data["vcpus"] = float(0) - data["memory"] = float(0) - data["actual_memory"] = float(0) - data["actual_vcpus"] = float(0) - data["time_window_from_date"] = date - data["time_window_to_date"] = date - - try: - project.addUser(user) - except Exception: - pass - - def calculateFairShare(self): - if not self.projects: - return + self._calculateShares() + self._calculateFairShare() + def _calculateShares(self): total_prj_share = float(0) - total_memory = float(0) - total_vcpus = float(0) + projects = self.project_manager.getProjects() - to_date = datetime.utcnow() - - time_window_from_date = to_date - time_window_to_date = to_date - - for prj_id, project in self.projects.items(): - for user in project.getUsers(): - data = user.getData() - data["vcpus"] = float(0) - data["memory"] = float(0) - - for period in xrange(self.periods): - decay = self.decay_weight ** period - from_date = to_date - timedelta(days=(self.period_length)) - time_window_from_date = from_date - - for prj_id, project in self.projects.items(): - usages = self.nova_manager.getProjectUsage( - prj_id, from_date, to_date) - - for user_id, usage_rec in usages.items(): - decay_vcpus = decay * usage_rec["vcpus"] - decay_memory = decay * usage_rec["memory"] - - user = project.getUser(id=user_id) - - if user: - data = user.getData() - data["vcpus"] += decay_vcpus - data["memory"] += decay_memory - - total_vcpus += decay_vcpus - total_memory += decay_memory - - to_date = from_date - - for project in self.projects.values(): + for project in projects.values(): prj_share = project.getShare() - if prj_share.getValue() == 0: - prj_share.setValue(self.default_share) - # check the share for each user and update the usage_record sibling_share = float(0) @@ -278,15 +168,7 @@ class FairShareManager(Manager): total_prj_share += prj_share.getValue() - for prj_id, project in self.projects.items(): - prj_data = project.getData() - prj_data["actual_memory"] = float(0) - prj_data["actual_vcpus"] = float(0) - prj_data["effective_memory"] = float(0) - prj_data["effective_vcpus"] = float(0) - prj_data["time_window_from_date"] = time_window_from_date - prj_data["time_window_to_date"] = time_window_to_date - + for project in projects.values(): prj_share = project.getShare() prj_share.setSiblingValue(total_prj_share) prj_share.setNormalizedValue( @@ -300,10 +182,75 @@ class FairShareManager(Manager): usr_share.getValue() / usr_share.getSiblingValue() * prj_share.getNormalizedValue()) + def _calculateFairShare(self): + projects = self.project_manager.getProjects() + + if not projects: + return + + total_memory = float(0) + total_vcpus = float(0) + + to_date = datetime.utcnow() + + time_window_from_date = to_date + time_window_to_date = to_date + + for prj_id, project in projects.items(): + prj_data = project.getData() + prj_data["actual_vcpus"] = float(0) + prj_data["actual_memory"] = float(0) + prj_data["effective_vcpus"] = float(0) + prj_data["effective_memory"] = float(0) + prj_data["time_window_from_date"] = time_window_from_date + prj_data["time_window_to_date"] = time_window_to_date + + for user in project.getUsers(): + usr_data = user.getData() + usr_data["vcpus"] = float(0) + usr_data["memory"] = float(0) + usr_data["actual_vcpus"] = float(0) + usr_data["actual_memory"] = float(0) + usr_data["effective_vcpus"] = float(0) + usr_data["effective_memory"] = float(0) + usr_data["actual_rel_vcpus"] = float(0) + usr_data["actual_rel_memory"] = float(0) + usr_data["time_window_from_date"] = time_window_from_date + usr_data["time_window_to_date"] = time_window_to_date + + for period in xrange(self.periods): + decay = self.decay_weight ** period + from_date = to_date - timedelta(days=(self.period_length)) + time_window_from_date = from_date + + for prj_id, project in projects.items(): + usages = self.nova_manager.getProjectUsage( + prj_id, from_date, to_date) + + for user_id, usage_rec in usages.items(): + decay_vcpus = decay * usage_rec["vcpus"] + decay_memory = decay * usage_rec["memory"] + + user = project.getUser(id=user_id) + + if user: + data = user.getData() + data["vcpus"] += decay_vcpus + data["memory"] += decay_memory + + total_vcpus += decay_vcpus + total_memory += decay_memory + + to_date = from_date + + for prj_id, project in projects.items(): + prj_data = project.getData() + prj_data["time_window_to_date"] = time_window_to_date + + for user in project.getUsers(): usr_data = user.getData() usr_data["actual_memory"] = usr_data["memory"] usr_data["actual_vcpus"] = usr_data["vcpus"] - usr_data["time_window_from_date"] = time_window_from_date usr_data["time_window_to_date"] = time_window_to_date if total_memory > 0: @@ -315,7 +262,7 @@ class FairShareManager(Manager): prj_data["actual_memory"] += usr_data["actual_memory"] prj_data["actual_vcpus"] += usr_data["actual_vcpus"] - for project in self.projects.values(): + for project in projects.values(): prj_data = project.getData() prj_data["effective_memory"] = prj_data["actual_memory"] prj_data["effective_vcpus"] = prj_data["actual_vcpus"] @@ -327,10 +274,6 @@ class FairShareManager(Manager): sibling_share = usr_share.getSiblingValue() norm_share = usr_share.getNormalizedValue() usr_data = user.getData() - usr_data["effective_vcpus"] = float(0) - usr_data["effective_memory"] = float(0) - usr_data["actual_rel_vcpus"] = float(0) - usr_data["actual_rel_memory"] = float(0) if prj_data["actual_vcpus"] > 0: usr_data["actual_rel_vcpus"] = usr_data["actual_vcpus"] @@ -361,5 +304,3 @@ class FairShareManager(Manager): usr_priority.setValue(float(self.vcpus_weight) * f_vcpus + float(self.memory_weight) * f_memory) - - LOG.debug("fairshare project %s" % project.serialize()) diff --git a/synergy_scheduler_manager/project_manager.py b/synergy_scheduler_manager/project_manager.py new file mode 100644 index 0000000..3c7e734 --- /dev/null +++ b/synergy_scheduler_manager/project_manager.py @@ -0,0 +1,354 @@ +import logging + +from common.project import Project +from oslo_config import cfg +from sqlalchemy import create_engine +from sqlalchemy.exc import SQLAlchemyError +from synergy.common.manager import Manager +from synergy.exception import SynergyError + + +__author__ = "Lisa Zangrando" +__email__ = "lisa.zangrando[AT]pd.infn.it" +__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud +All Rights Reserved + +Licensed under the Apache License, Version 2.0; +you may not use this file except in compliance with the +License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. +See the License for the specific language governing +permissions and limitations under the License.""" + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class ProjectManager(Manager): + + def __init__(self): + super(ProjectManager, self).__init__(name="ProjectManager") + + self.config_opts = [ + cfg.IntOpt("default_TTL", default=1440, required=False), + cfg.FloatOpt("default_share", default=10.0, required=False), + cfg.StrOpt("db_connection", help="the DB url", required=True), + cfg.IntOpt('db_pool_size', default=10, required=False), + cfg.IntOpt('db_pool_recycle', default=30, required=False), + cfg.IntOpt('db_max_overflow', default=5, required=False) + ] + + self.projects = {} + + def setup(self): + self.default_TTL = CONF.ProjectManager.default_TTL + self.default_share = CONF.ProjectManager.default_share + + db_connection = CONF.ProjectManager.db_connection + pool_size = CONF.ProjectManager.db_pool_size + pool_recycle = CONF.ProjectManager.db_pool_recycle + max_overflow = CONF.ProjectManager.db_max_overflow + + try: + self.db_engine = create_engine(db_connection, + pool_size=pool_size, + pool_recycle=pool_recycle, + max_overflow=max_overflow) + except SQLAlchemyError as ex: + LOG.error(ex) + raise ex + + self.configured = False + self.keystone_manager = self.getManager("KeystoneManager") + self.createTable() + + def task(self): + if not self.configured: + self.buildFromDB() + self.configured = True + + def destroy(self): + pass + + def execute(self, command, *args, **kargs): + if command == "GET_PROJECTS": + return self.projects.values() + + prj_id = kargs.get("id", None) + prj_name = kargs.get("name", None) + + project = self.getProject(prj_id, prj_name) + + if command == "GET_PROJECT": + if project: + return project + else: + raise SynergyError("project not found!") + + elif command == "ADD_PROJECT": + if project: + raise SynergyError("the project id=%s name=%s already exists!" + % (project.getId(), project.getName())) + + TTL = kargs.get("TTL", None) + share = kargs.get("share", None) + + return self._addProject(prj_id, prj_name, TTL, share) + + elif command == "UPDATE_PROJECT": + if not project: + raise SynergyError("project not found!") + + TTL = kargs.get("TTL", None) + share = kargs.get("share", None) + + return self._updateProject(project, TTL, share) + + elif command == "REMOVE_PROJECT": + if not project: + raise SynergyError("project not found!") + + self._removeProject(project) + else: + raise SynergyError("command %r not supported!" % command) + + def doOnEvent(self, event_type, *args, **kwargs): + if event_type == "identity.role_assignment.created": + usr_id = kwargs.get("user", None) + prj_id = kwargs.get("project", None) + project = self.getProject(id=prj_id) + + if project and not project.getUser(id=usr_id): + user = self.keystone_manager.getUser(usr_id) + if user: + project.addUser(user) + + self.notify(event_type="USER_ADDED", user=user) + + elif event_type == "identity.role_assignment.deleted": + usr_id = kwargs.get("user", None) + prj_id = kwargs.get("project", None) + project = self.getProject(id=prj_id) + + if project: + project.removeUser(usr_id) + self.notify(event_type="USER_REMOVED", user=user) + + elif event_type == "identity.user.deleted": + user_id = kwargs.get("resource_info", None) + + for project in self.projects.values(): + try: + user = project.getUser(id=user_id) + if user: + project.removeUser(user_id) + + self.notify(event_type="USER_DELETED", user=user) + except SynergyError as ex: + LOG.info(ex) + + def _parseNumber(self, value, default=None): + if not value: + return default + + try: + return int(value) + except SynergyError: + if default: + return default + raise SynergyError("%r is not a number!" % str(value)) + + def _addProject(self, prj_id, prj_name, TTL, share): + if prj_id: + project = self.keystone_manager.getProject(prj_id) + elif prj_name: + projects = self.keystone_manager.getProjects(name=prj_name) + + if len(projects) > 1: + raise SynergyError("ambiguity: found %s projects having %r" + " as name" % (len(projects), prj_name)) + if projects: + project = projects[0] + else: + raise SynergyError("missing project attributes") + + if not project: + raise SynergyError("project not found!") + + prj_TTL = self._parseNumber(TTL, default=self.default_TTL) + prj_share = self._parseNumber(share, 0) + + project.setTTL(prj_TTL) + project.getShare().setValue(prj_share) + + QUERY = "insert into project (id, name, share, TTL) " \ + "values (%s, %s, %s, %s)" + + connection = self.db_engine.connect() + trans = connection.begin() + + try: + connection.execute( + QUERY, [project.getId(), project.getName(), + project.getShare().getValue(), project.getTTL()]) + + trans.commit() + except SQLAlchemyError as ex: + trans.rollback() + + if "Duplicate entry" in ex.message: + raise SynergyError("the project id=%s name=%s already exists!" + % (project.getId(), project.getName())) + else: + raise(ex) + finally: + connection.close() + + self.projects[project.getId()] = project + + LOG.info("added project %s" % project.getName()) + self.notify(event_type="PROJECT_ADDED", project=project) + + return project + + def _updateProject(self, project, TTL, share): + if not project: + return + + TTL = self._parseNumber(TTL) + if TTL: + TTL = self._parseNumber(TTL) + if TTL <= 0: + raise SynergyError("wrong TTL value: %s <= 0" % TTL) + project.setTTL(TTL) + + share = self._parseNumber(share) + if share: + if share <= 0: + raise SynergyError("wrong share value: %s <= 0" % share) + project.getShare().setValue(share) + + connection = self.db_engine.connect() + trans = connection.begin() + + try: + QUERY = "update project set share=%s, TTL=%s where id=%s" + + connection.execute(QUERY, [project.getShare().getValue(), + project.getTTL(), + project.getId()]) + + trans.commit() + except SQLAlchemyError as ex: + trans.rollback() + + raise SynergyError(ex.message) + finally: + connection.close() + + self.notify(event_type="PROJECT_UPDATED", project=project) + + def _removeProject(self, project): + if project.getId() not in self.projects.keys(): + raise SynergyError("project %s not found!" % project.getId()) + + self.projects.pop(project.getId()) + + connection = self.db_engine.connect() + trans = connection.begin() + + try: + QUERY = "delete from project where id=%s" + + connection.execute(QUERY, [project.getId()]) + + trans.commit() + except SQLAlchemyError as ex: + trans.rollback() + + raise SynergyError(ex.message) + finally: + connection.close() + + LOG.info("removed project %s" % project.getName()) + self.notify(event_type="PROJECT_REMOVED", project=project) + + def getProject(self, id=None, name=None): + if not id and not name: + raise SynergyError("please define the project id or its name!") + + project = None + + if id: + project = self.projects.get(id, None) + elif name: + for prj in self.projects.values(): + if name == prj.getName(): + project = prj + break + + return project + + def getProjects(self): + return self.projects + + def createTable(self): + TABLE = """CREATE TABLE IF NOT EXISTS project (`id` VARCHAR(64) \ +NOT NULL PRIMARY KEY, name VARCHAR(64), share INT DEFAULT 0, TTL INT DEFAULT \ +1440) ENGINE=InnoDB""" + + connection = self.db_engine.connect() + + try: + connection.execute(TABLE) + except SQLAlchemyError as ex: + raise SynergyError(ex.message) + except Exception as ex: + raise SynergyError(ex.message) + finally: + connection.close() + + def buildFromDB(self): + connection = self.db_engine.connect() + + try: + QUERY = "select id, name, share, TTL from project" + result = connection.execute(QUERY) + + for row in result: + project = Project() + project.setId(row[0]) + project.setName(row[1]) + project.getShare().setValue(row[2]) + project.setTTL(row[3]) + project.setId(row[0]) + project_id = project.getId() + try: + k_project = self.keystone_manager.getProject(project_id) + + if not k_project: + self.removeProject(project) + continue + + users = self.keystone_manager.getUsers(prj_id=project_id) + + for user in users: + project.addUser(user) + + self.projects[project.getId()] = project + + self.notify(event_type="PROJECT_ADDED", project=project) + except SynergyError as ex: + LOG.info("the project %s seems not to exist anymore! " + "(reason=%s)" % (project.getName(), ex.message)) + self.removeProject(project) + except SQLAlchemyError as ex: + raise SynergyError(ex.message) + finally: + connection.close() diff --git a/synergy_scheduler_manager/quota_manager.py b/synergy_scheduler_manager/quota_manager.py index 750c628..ce04daf 100644 --- a/synergy_scheduler_manager/quota_manager.py +++ b/synergy_scheduler_manager/quota_manager.py @@ -4,6 +4,7 @@ import logging from common.quota import SharedQuota from oslo_config import cfg from synergy.common.manager import Manager +from synergy.exception import SynergyError __author__ = "Lisa Zangrando" @@ -39,152 +40,143 @@ class QuotaManager(Manager): self.projects = {} if self.getManager("NovaManager") is None: - raise Exception("NovaManager not found!") + raise SynergyError("NovaManager not found!") if self.getManager("KeystoneManager") is None: - raise Exception("KeystoneManager not found!") + raise SynergyError("KeystoneManager not found!") + + if self.getManager("ProjectManager") is None: + raise SynergyError("ProjectManager not found!") self.nova_manager = self.getManager("NovaManager") self.keystone_manager = self.getManager("KeystoneManager") - self.listener = None + self.project_manager = self.getManager("ProjectManager") def destroy(self): LOG.info("destroy invoked!") SharedQuota.disable() def execute(self, command, *args, **kargs): - if command == "show": - prj_id = kargs.get("project_id", None) - prj_name = kargs.get("project_name", None) - all_projects = kargs.get("all_projects", None) - - if prj_id: - project = self.projects.get(prj_id, None) + if command == "GET_PRIVATE_QUOTA": + prj_id = kargs.get("id", None) + prj_name = kargs.get("name", None) + project = self.project_manager.getProject(prj_id, prj_name) + if project: + return project.getQuota() + else: + raise SynergyError("project not found!") if project: return project - - raise Exception("project (id=%r) not found!" % prj_id) - elif prj_name: - for project in self.projects.values(): - if prj_name == project.getName(): - return project - - raise Exception("project (name=%r) not found!" % prj_name) - elif all_projects: - return self.projects.values() - else: - return SharedQuota() elif command == "GET_SHARED_QUOTA": return SharedQuota() - elif command == "GET_PROJECTS": - return self.projects.values() - elif command == "GET_PROJECT": - return self.getProject(*args, **kargs) else: - raise Exception("command=%r not supported!" % command) + raise SynergyError("command %r not supported!" % command) def task(self): try: self.updateSharedQuota() self.deleteExpiredServers() - except Exception as ex: + except SynergyError as ex: LOG.error(ex) - def getProject(self, prj_id): - return self.projects.get(prj_id, None) + def doOnEvent(self, event_type, *args, **kwargs): + if event_type == "PROJECT_ADDED": + project = kwargs.get("project", None) - def getProjects(self): - return self.projects + if not project: + return - def addProject(self, project): - if self.projects.get(project.getId(), None): - raise Exception("project %r already exists!" % (project.getId())) + try: + quota = self.nova_manager.getQuota(project.getId()) + + if quota.getSize("vcpus") > 0 and \ + quota.getSize("memory") > 0 and \ + quota.getSize("instances") > 0: + self.nova_manager.updateQuota(quota, is_class=True) + + quota.setSize("vcpus", -1) + quota.setSize("memory", -1) + quota.setSize("instances", -1) + + self.nova_manager.updateQuota(quota) + + class_quota = self.nova_manager.getQuota( + project.getId(), is_class=True) + + quota = project.getQuota() + quota.setId(project.getId()) + quota.setSize("vcpus", class_quota.getSize("vcpus")) + quota.setSize("memory", class_quota.getSize("memory")) + quota.setSize("instances", class_quota.getSize("instances")) + quota.setSize( + "vcpus", SharedQuota.getSize("vcpus"), private=False) + quota.setSize( + "memory", SharedQuota.getSize("memory"), private=False) + quota.setSize( + "instances", SharedQuota.getSize("instances"), + private=False) + + servers = self.nova_manager.getProjectServers(project.getId()) + + for server in servers: + if server.getState() != "building": + try: + quota.allocate(server) + except SynergyError as ex: + fl = server.getFlavor() + vcpus_size = quota.getSize("vcpus") + fl.getVCPUs() + mem_size = quota.getSize("memory") + fl.getMemory() + + quota.setSize("vcpus", vcpus_size) + quota.setSize("memory", mem_size) + + self.nova_manager.updateQuota(quota, is_class=True) + + LOG.warn("private quota autoresized (vcpus=%s, " + "memory=%s) for project %r (id=%s)" + % (quota.getSize("vcpus"), + quota.getSize("memory"), + project.getName(), + project.getId())) + quota.allocate(server) + + self.projects[project.getId()] = project + except SynergyError as ex: + LOG.error(ex) + raise ex + + elif event_type == "PROJECT_REMOVED": + project = kwargs.get("project", None) + + if not project: + return - try: quota = self.nova_manager.getQuota(project.getId()) - if quota.getSize("vcpus") > 0 and \ - quota.getSize("memory") > 0 and \ - quota.getSize("instances") > 0: + if quota.getSize("vcpus") <= -1 and \ + quota.getSize("memory") <= -1 and \ + quota.getSize("instances") <= -1: - self.nova_manager.updateQuota(quota, is_class=True) - - quota.setSize("vcpus", -1) - quota.setSize("memory", -1) - quota.setSize("instances", -1) - - self.nova_manager.updateQuota(quota) - - class_quota = self.nova_manager.getQuota( - project.getId(), is_class=True) + qc = self.nova_manager.getQuota(project.getId(), is_class=True) + self.nova_manager.updateQuota(qc) quota = project.getQuota() - quota.setId(project.getId()) - quota.setSize("vcpus", class_quota.getSize("vcpus")) - quota.setSize("memory", class_quota.getSize("memory")) - quota.setSize("instances", class_quota.getSize("instances")) - quota.setSize( - "vcpus", SharedQuota.getSize("vcpus"), private=False) - quota.setSize( - "memory", SharedQuota.getSize("memory"), private=False) - quota.setSize( - "instances", SharedQuota.getSize("instances"), private=False) - servers = self.nova_manager.getProjectServers(project.getId()) - - for server in servers: - if server.getState() != "building": - try: - quota.allocate(server) - except Exception as ex: - flavor = server.getFlavor() - vcpus_size = quota.getSize("vcpus") + flavor.getVCPUs() - mem_size = quota.getSize("memory") + flavor.getMemory() - - quota.setSize("vcpus", vcpus_size) - quota.setSize("memory", mem_size) - - self.nova_manager.updateQuota(quota, is_class=True) - - LOG.warn("private quota autoresized (vcpus=%s, " - "memory=%s) for project %r (id=%s)" - % (quota.getSize("vcpus"), - quota.getSize("memory"), - project.getName(), - project.getId())) - quota.allocate(server) - - self.projects[project.getId()] = project - except Exception as ex: - LOG.error(ex) - raise ex - - def removeProject(self, project, destroy=False): - project = self.projects[project.getId()] - - if project is None: - return - - try: - if destroy: - quota = project.getQuota() - - ids = [] - ids.extend(quota.getServers("active", private=False)) - ids.extend(quota.getServers("pending", private=False)) - ids.extend(quota.getServers("error", private=False)) + ids = [] + ids.extend(quota.getServers("active", private=False)) + ids.extend(quota.getServers("building", private=False)) + ids.extend(quota.getServers("error", private=False)) + try: for server_id in ids: self.nova_manager.deleteServer(server_id) - - del self.projects[project.getId()] - except Exception as ex: - LOG.error(ex) - raise ex + except SynergyError as ex: + LOG.error(ex) + raise ex def deleteExpiredServers(self): - for prj_id, project in self.getProjects().items(): + for prj_id, project in self.project_manager.getProjects().items(): TTL = project.getTTL() quota = project.getQuota() @@ -217,7 +209,7 @@ class QuotaManager(Manager): % (uuid, TTL, state, prj_id)) self.nova_manager.deleteServer(server) - except Exception as ex: + except SynergyError as ex: LOG.error(ex) raise ex @@ -252,7 +244,7 @@ class QuotaManager(Manager): domain = self.keystone_manager.getDomains(name="default") if not domain: - raise Exception("domain 'default' not found!") + raise SynergyError("domain 'default' not found!") domain = domain[0] dom_id = domain.getId() @@ -260,7 +252,7 @@ class QuotaManager(Manager): kprojects = self.keystone_manager.getProjects(domain_id=dom_id) for kproject in kprojects: - project = self.getProject(kproject.getId()) + project = self.project_manager.getProject(id=kproject.getId()) if project: quota = self.nova_manager.getQuota(project.getId(), @@ -298,7 +290,7 @@ class QuotaManager(Manager): enabled = False if total_vcpus < static_vcpus: - if self.getProjects(): + if self.project_manager.getProjects(): LOG.warn("shared quota: the total statically " "allocated vcpus (%s) is greater than the " "total amount of vcpus allowed (%s)" @@ -307,7 +299,7 @@ class QuotaManager(Manager): shared_vcpus = total_vcpus - static_vcpus if total_memory < static_memory: - if self.getProjects(): + if self.project_manager.getProjects(): LOG.warn("shared quota: the total statically " "allocated memory (%s) is greater than " "the total amount of memory allowed (%s)" @@ -330,10 +322,10 @@ class QuotaManager(Manager): SharedQuota.setSize("vcpus", 0) SharedQuota.setSize("memory", 0) - for project in self.getProjects().values(): + for project in self.project_manager.getProjects().values(): quota = project.getQuota() quota.setSize("vcpus", shared_vcpus, private=False) quota.setSize("memory", shared_memory, private=False) - except Exception as ex: + except SynergyError as ex: LOG.error(ex) raise ex diff --git a/synergy_scheduler_manager/scheduler_manager.py b/synergy_scheduler_manager/scheduler_manager.py index f9dc7f6..f3170f7 100644 --- a/synergy_scheduler_manager/scheduler_manager.py +++ b/synergy_scheduler_manager/scheduler_manager.py @@ -1,5 +1,4 @@ -import logging -import re +import logging from common.flavor import Flavor from common.quota import SharedQuota @@ -7,6 +6,7 @@ from common.request import Request from common.server import Server from oslo_config import cfg from synergy.common.manager import Manager +from synergy.exception import SynergyError from threading import Thread @@ -153,7 +153,7 @@ class Notifications(object): class Worker(Thread): - def __init__(self, name, queue, projects, nova_manager, + def __init__(self, name, queue, project_manager, nova_manager, keystone_manager, backfill_depth=100): super(Worker, self).__init__() self.setDaemon(True) @@ -161,7 +161,7 @@ class Worker(Thread): self.name = name self.backfill_depth = backfill_depth self.queue = queue - self.projects = projects + self.project_manager = project_manager self.nova_manager = nova_manager self.keystone_manager = keystone_manager self.exit = False @@ -175,7 +175,7 @@ class Worker(Thread): self.queue.close() self.exit = True - except Exception as ex: + except SynergyError as ex: LOG.error(ex) raise ex @@ -223,14 +223,19 @@ class Worker(Thread): # or server["OS-EXT-STS:task_state"] != "scheduling": self.queue.deleteItem(queue_item) continue - except Exception as ex: + except SynergyError as ex: LOG.warn("the server %s is not anymore available!" - "(reason=%s)" % (server_id, ex)) - self.queue.deleteItem(queue_item) + " (reason=%s)" % (server_id, ex)) + self.queue.delete(queue_item) continue - quota = self.projects[prj_id].getQuota() + project = self.project_manager.getProject(id=prj_id) + + if not project: + raise SynergyError("project %r not found!" % prj_id) + + quota = project.getQuota() blocking = False if server.isEphemeral() and not SharedQuota.isEnabled(): @@ -246,7 +251,7 @@ class Worker(Thread): context["auth_token"] = token.getId() context["user_id"] = token.getUser().getId() - except Exception as ex: + except SynergyError as ex: LOG.error("error on getting the token for server " "%s (reason=%s)" % (server.getId(), ex)) raise ex @@ -258,7 +263,7 @@ class Worker(Thread): "ta=shared)" % (server_id, user_id, prj_id)) found = True - except Exception as ex: + except SynergyError as ex: LOG.error("error on building the server %s (reason=%s)" % (server.getId(), ex)) @@ -270,7 +275,7 @@ class Worker(Thread): else: queue_items.append(queue_item) - except Exception as ex: + except SynergyError as ex: LOG.error("Exception has occured", exc_info=1) LOG.error("Worker %s: %s" % (self.name, ex)) @@ -287,35 +292,34 @@ class SchedulerManager(Manager): self.config_opts = [ cfg.StrOpt("notification_topic", default="notifications"), cfg.IntOpt("backfill_depth", default=100), - cfg.FloatOpt("default_TTL", default=10.0), - cfg.ListOpt("projects", default=[], help="the projects list"), - cfg.ListOpt("shares", default=[], help="the shares list"), - cfg.ListOpt("TTLs", default=[], help="the TTLs list") ] self.workers = [] def setup(self): if self.getManager("NovaManager") is None: - raise Exception("NovaManager not found!") + raise SynergyError("NovaManager not found!") if self.getManager("QueueManager") is None: - raise Exception("QueueManager not found!") + raise SynergyError("QueueManager not found!") if self.getManager("QuotaManager") is None: - raise Exception("QuotaManager not found!") + raise SynergyError("QuotaManager not found!") if self.getManager("KeystoneManager") is None: - raise Exception("KeystoneManager not found!") + raise SynergyError("KeystoneManager not found!") if self.getManager("FairShareManager") is None: - raise Exception("FairShareManager not found!") + raise SynergyError("FairShareManager not found!") + + if self.getManager("ProjectManager") is None: + raise SynergyError("ProjectManager not found!") self.nova_manager = self.getManager("NovaManager") self.queue_manager = self.getManager("QueueManager") self.quota_manager = self.getManager("QuotaManager") self.keystone_manager = self.getManager("KeystoneManager") self.fairshare_manager = self.getManager("FairShareManager") - self.default_TTL = float(CONF.SchedulerManager.default_TTL) + self.project_manager = self.getManager("ProjectManager") self.backfill_depth = CONF.SchedulerManager.backfill_depth self.notification_topic = CONF.SchedulerManager.notification_topic self.projects = {} @@ -323,129 +327,18 @@ class SchedulerManager(Manager): self.exit = False self.configured = False - def parseAttribute(self, attribute): - if attribute is None: - return None - - parsed_attribute = re.split('=', attribute) - - if len(parsed_attribute) > 1: - if not parsed_attribute[-1].isdigit(): - raise Exception("wrong value %r found in %r!" - % (parsed_attribute[-1], parsed_attribute)) - - if len(parsed_attribute) == 2: - prj_name = parsed_attribute[0] - value = float(parsed_attribute[1]) - else: - raise Exception("wrong attribute definition: %r" - % parsed_attribute) - else: - raise Exception("wrong attribute definition: %r" - % parsed_attribute) - - return (prj_name, value) - def execute(self, command, *args, **kargs): - if command == "show": - usr_id = kargs.get("user_id", None) - usr_name = kargs.get("user_name", None) - all_users = kargs.get("all_users", False) - all_projects = kargs.get("all_projects", False) - prj_id = kargs.get("project_id", None) - prj_name = kargs.get("project_name", None) - project = None - - if all_projects: - return self.projects.values() - - if (usr_id is not None or usr_name is not None or all_users) and \ - prj_id is None and prj_name is None: - raise Exception("project id or name not defined!") - - if prj_id: - project = self.projects.get(prj_id, None) - - if not project: - raise Exception("project %s not found!" % prj_id) - elif prj_name: - for prj in self.projects.values(): - if prj_name == prj.getName(): - project = prj - break - - if not project: - raise Exception("project %r not found!" % prj_name) - elif not all_users: - return self.projects.values() - - if usr_id or usr_name: - return project.getUser(id=usr_id, name=usr_name) - elif all_users: - return project.getUsers() - else: - return project - else: - raise Exception("command=%r not supported!" % command) + raise SynergyError("command %r not supported!" % command) def task(self): if self.configured: return - domain = self.keystone_manager.getDomains(name="default") - if not domain: - raise Exception("domain 'default' not found!") - - domain = domain[0] - dom_id = domain.getId() - - for project in self.keystone_manager.getProjects(domain_id=dom_id): - if project.getName() in CONF.SchedulerManager.projects: - CONF.SchedulerManager.projects.remove(project.getName()) - project.setTTL(self.default_TTL) - - self.projects[project.getName()] = project - else: - quota = self.nova_manager.getQuota(project.getId()) - - if quota.getSize("vcpus") <= -1 and \ - quota.getSize("memory") <= -1 and \ - quota.getSize("instances") <= -1: - - qc = self.nova_manager.getQuota(project.getId(), - is_class=True) - - self.nova_manager.updateQuota(qc) - - if len(CONF.SchedulerManager.projects) > 0: - raise Exception("projects %s not found, please check the syn" - "ergy.conf" % CONF.SchedulerManager.projects) - self.quota_manager.updateSharedQuota() - for prj_ttl in CONF.SchedulerManager.TTLs: - prj_name, TTL = self.parseAttribute(prj_ttl) - self.projects[prj_name].setTTL(TTL) - - for prj_share in CONF.SchedulerManager.shares: - prj_name, share_value = self.parseAttribute(prj_share) - p_share = self.projects[prj_name].getShare() - p_share.setValue(share_value) - - for prj_name, project in self.projects.items(): - del self.projects[prj_name] - self.projects[project.getId()] = project - - self.quota_manager.addProject(project) - self.fairshare_manager.addProject(project) - - self.quota_manager.updateSharedQuota() - self.fairshare_manager.checkUsers() - self.fairshare_manager.calculateFairShare() - try: self.dynamic_queue = self.queue_manager.createQueue("DYNAMIC") - except Exception as ex: + except SynergyError as ex: LOG.error("Exception has occured", exc_info=1) LOG.error(ex) @@ -453,7 +346,7 @@ class SchedulerManager(Manager): dynamic_worker = Worker("DYNAMIC", self.dynamic_queue, - self.projects, + self.project_manager, self.nova_manager, self.keystone_manager, self.backfill_depth) @@ -482,11 +375,10 @@ class SchedulerManager(Manager): def processRequest(self, request): server = request.getServer() - try: - if request.getProjectId() in self.projects: - self.nova_manager.setQuotaTypeServer(server) + project = self.project_manager.getProject(id=request.getProjectId()) - project = self.projects[request.getProjectId()] + try: + if project: quota = project.getQuota() retry = request.getRetry() num_attempts = 0 @@ -565,6 +457,6 @@ class SchedulerManager(Manager): priority)) else: self.nova_manager.buildServer(request) - except Exception as ex: + except SynergyError as ex: LOG.error("Exception has occured", exc_info=1) LOG.error(ex) diff --git a/synergy_scheduler_manager/tests/functional/test_fairshare_manager.py b/synergy_scheduler_manager/tests/functional/test_fairshare_manager.py index a7b7b2e..c7a297f 100755 --- a/synergy_scheduler_manager/tests/functional/test_fairshare_manager.py +++ b/synergy_scheduler_manager/tests/functional/test_fairshare_manager.py @@ -17,9 +17,7 @@ from mock import patch from synergy_scheduler_manager.common.project import Project from synergy_scheduler_manager.common.user import User from synergy_scheduler_manager.fairshare_manager import FairShareManager -from synergy_scheduler_manager.keystone_manager import KeystoneManager -from synergy_scheduler_manager.queue_manager import QueueManager -from synergy_scheduler_manager.quota_manager import QuotaManager +from synergy_scheduler_manager.project_manager import ProjectManager from synergy_scheduler_manager.tests.unit import base @@ -27,92 +25,32 @@ class TestFairshareManager(base.TestCase): def setUp(self): super(TestFairshareManager, self).setUp() - self.fsmanager = FairShareManager() + self.fairshare_manager = FairShareManager() + self.project_manager = ProjectManager() # NOTE(vincent): we cannot import NovaManager in our tests. # NovaManager depends on the "nova" package (not novaclient), but it is # not available on PyPI so the test runner will fail to install it. nova_manager_mock = MagicMock() - self.fsmanager.managers = { + self.fairshare_manager.managers = { 'NovaManager': nova_manager_mock(), - 'QueueManager': QueueManager(), - 'QuotaManager': QuotaManager(), - 'KeystoneManager': KeystoneManager()} + 'ProjectManager': self.project_manager} # Mock the configuration since it is initiliazed by synergy-service. with patch('synergy_scheduler_manager.fairshare_manager.CONF'): - self.fsmanager.setup() - - def test_add_project(self): - project = Project() - project.setId(1) - project.setName("test_project") - prj_share = project.getShare() - prj_share.setValue(5) - self.fsmanager.addProject(project) - - self.assertEqual(1, self.fsmanager.projects[1].getId()) - self.assertEqual("test_project", self.fsmanager.projects[1].getName()) - self.assertEqual([], self.fsmanager.projects[1].getUsers()) - self.assertEqual(5, self.fsmanager.projects[1].getShare().getValue()) - - def test_add_project_no_share(self): - project = Project() - project.setId(1) - project.setName("test_project") - self.fsmanager.addProject(project) - - self.assertEqual(1, self.fsmanager.projects[1].getId()) - self.assertEqual("test_project", self.fsmanager.projects[1].getName()) - self.assertEqual([], self.fsmanager.projects[1].getUsers()) - self.assertEqual(self.fsmanager.default_share, - self.fsmanager.projects[1].getShare().getValue()) - - def test_get_project(self): - project = Project() - project.setId(1) - project.setName("test_project") - self.fsmanager.addProject(project) - - self.assertEqual(project, self.fsmanager.getProject(1)) - - def test_get_projects(self): - project1 = Project() - project1.setId(1) - project1.setName("test1") - self.fsmanager.addProject(project1) - - project2 = Project() - project2.setId(2) - project2.setName("test2") - self.fsmanager.addProject(project2) - - expected_projects = { - 1: project1, - 2: project2} - self.assertEqual(expected_projects, self.fsmanager.getProjects()) - - def test_remove_project(self): - project = Project() - project.setId(1) - project.setName("test_project") - self.fsmanager.addProject(project) - - self.assertIn(1, self.fsmanager.projects) - self.fsmanager.removeProject(1) - self.assertNotIn(1, self.fsmanager.projects) + self.fairshare_manager.setup() def test_calculate_priority_one_user(self): - # self.fsmanager.addProject(prj_id=1, prj_name="test") + # self.fairshare_manager.addProject(prj_id=1, prj_name="test") project = Project() project.setId(1) project.setName("test_project") # Define values used for computing the priority - age_weight = self.fsmanager.age_weight = 1.0 - vcpus_weight = self.fsmanager.vcpus_weight = 2.0 - memory_weight = self.fsmanager.memory_weight = 3.0 + age_weight = self.fairshare_manager.age_weight = 1.0 + vcpus_weight = self.fairshare_manager.vcpus_weight = 2.0 + memory_weight = self.fairshare_manager.memory_weight = 3.0 datetime_start = datetime(year=2000, month=1, day=1, hour=0, minute=0) datetime_stop = datetime(year=2000, month=1, day=1, hour=2, minute=0) minutes = (datetime_stop - datetime_start).seconds / 60 @@ -128,7 +66,7 @@ class TestFairshareManager(base.TestCase): priority.setFairShare('memory', fairshare_ram) project.addUser(user) - self.fsmanager.addProject(project) + self.project_manager.projects[project.getId()] = project # Compute the expected priority given the previously defined values expected_priority = int(age_weight * minutes + @@ -138,7 +76,8 @@ class TestFairshareManager(base.TestCase): with patch("synergy_scheduler_manager.fairshare_manager.datetime") \ as datetime_mock: datetime_mock.utcnow.side_effect = (datetime_start, datetime_stop) - priority = self.fsmanager.calculatePriority(user_id=22, prj_id=1) + priority = self.fairshare_manager.calculatePriority( + user.getId(), project.getId()) self.assertEqual(expected_priority, priority) diff --git a/synergy_scheduler_manager/tests/functional/test_scheduler_manager.py b/synergy_scheduler_manager/tests/functional/test_scheduler_manager.py index 6924b7b..499ded6 100644 --- a/synergy_scheduler_manager/tests/functional/test_scheduler_manager.py +++ b/synergy_scheduler_manager/tests/functional/test_scheduler_manager.py @@ -19,6 +19,7 @@ from synergy_scheduler_manager.common.queue import QueueDB from synergy_scheduler_manager.common.queue import QueueItem from synergy_scheduler_manager.common.quota import SharedQuota from synergy_scheduler_manager.common.server import Server +from synergy_scheduler_manager.project_manager import ProjectManager from synergy_scheduler_manager.scheduler_manager import Notifications from synergy_scheduler_manager.scheduler_manager import Worker from synergy_scheduler_manager.tests.unit import base @@ -130,19 +131,36 @@ class TestWorker(base.TestCase): self.keystone_manager_mock = MagicMock() db_engine_mock = create_autospec(Engine) - project1 = Project() - project1.setId("1") - project1.setName("test1") + def my_side_effect(*args, **kwargs): + project1 = Project() + project1.setId(1) + project1.setName("test_project1") + project1.getShare().setValue(5) - project2 = Project() - project2.setId("2") - project2.setName("test2") + project2 = Project() + project2.setId(2) + project2.setName("test_project2") + project2.getShare().setValue(55) + + if args[0] == 1: + return project1 + elif args[0] == 2: + return project2 + + keystone_manager = MagicMock() + keystone_manager.getProject.side_effect = my_side_effect + + self.project_manager = ProjectManager() + self.project_manager.db_engine = MagicMock() + self.project_manager.keystone_manager = keystone_manager + self.project_manager.default_TTL = 10 + self.project_manager.default_share = 30 + self.project_manager._addProject(1, "test_project1", 10, 50) - projects_list = {'1': project1, '2': project2} self.worker = Worker( name="test", queue=QueueDB("testq", db_engine_mock), - projects=projects_list, + project_manager=self.project_manager, nova_manager=self.nova_manager_mock, keystone_manager=self.keystone_manager_mock) @@ -181,11 +199,11 @@ class TestWorker(base.TestCase): nova_exec = self.nova_manager_mock.execute nova_exec.side_effect = nova_exec_side_effect + project = self.project_manager.getProject(id=1) # Mock quota allocation - quota_allocate_mock = create_autospec( - self.worker.projects['1'].getQuota().allocate) + quota_allocate_mock = create_autospec(project.getQuota().allocate) quota_allocate_mock.return_value = True - self.worker.projects['1'].getQuota().allocate = quota_allocate_mock + project.getQuota().allocate = quota_allocate_mock # Delete item from the queue delete_item_mock = create_autospec(self.worker.queue.deleteItem)