import logging from common.quota import SharedQuota from common.request import Request from oslo_config import cfg from synergy.common.manager import Manager from synergy.exception import SynergyError from threading import Thread __author__ = "Lisa Zangrando" __email__ = "lisa.zangrando[AT]pd.infn.it" __copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud All Rights Reserved Licensed under the Apache License, Version 2.0; you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.""" CONF = cfg.CONF LOG = logging.getLogger(__name__) class Worker(Thread): def __init__(self, name, queue, project_manager, nova_manager, keystone_manager, backfill_depth=100): super(Worker, self).__init__() self.setDaemon(True) self.name = name self.backfill_depth = backfill_depth self.queue = queue self.project_manager = project_manager self.nova_manager = nova_manager self.keystone_manager = keystone_manager self.exit = False LOG.info("Worker %s created!" % self.name) def getName(self): return self.name def destroy(self): try: self.queue.close() self.exit = True except SynergyError as ex: LOG.error(ex) raise ex def run(self): LOG.info("Worker %s running!" % self.name) queue_items = [] last_release_time = SharedQuota.getLastReleaseTime() while not self.exit and not self.queue.isClosed(): if last_release_time < SharedQuota.getLastReleaseTime(): last_release_time = SharedQuota.getLastReleaseTime() while queue_items: self.queue.reinsertItem(queue_items.pop(0)) if len(queue_items) >= self.backfill_depth: SharedQuota.wait() continue queue_item = self.queue.getItem(blocking=False) if queue_item is None: if self.queue.getSize(): SharedQuota.wait() continue else: queue_item = self.queue.getItem(blocking=True) if queue_item is None: continue try: request = Request.fromDict(queue_item.getData()) user_id = request.getUserId() prj_id = request.getProjectId() context = request.getContext() server = request.getServer() server_id = server.getId() quota = None try: s = self.nova_manager.getServer(server_id, detail=True) if s.getState() != "building": # or server["OS-EXT-STS:task_state"] != "scheduling": self.queue.deleteItem(queue_item) continue except SynergyError as ex: LOG.warn("the server %s is not anymore available!" " (reason=%s)" % (server_id, ex)) self.queue.delete(queue_item) continue project = self.project_manager.getProject(id=prj_id) if not project: raise SynergyError("project %r not found!" % prj_id) quota = project.getQuota() blocking = False if server.isEphemeral() and not SharedQuota.isEnabled(): blocking = True if quota.allocate(server, blocking=blocking): found = False try: km = self.keystone_manager trust = km.getTrust(context["trust_id"]) token = trust.getToken(km.getToken().getId()) context["auth_token"] = token.getId() context["user_id"] = token.getUser().getId() except SynergyError as ex: LOG.error("error on getting the token for server " "%s (reason=%s)" % (server.getId(), ex)) raise ex try: self.nova_manager.buildServer(request) LOG.info("building server %s user_id=%s prj_id=%s quo" "ta=shared" % (server_id, user_id, prj_id)) found = True except SynergyError as ex: LOG.error("error on building the server %s (reason=%s)" % (server.getId(), ex)) if found: self.queue.deleteItem(queue_item) else: quota.release(server) queue_items.append(queue_item) else: queue_items.append(queue_item) except SynergyError as ex: LOG.error("Exception has occured", exc_info=1) LOG.error("Worker %s: %s" % (self.name, ex)) self.queue.deleteItem(queue_item) LOG.info("Worker %s destroyed!" % self.name) class SchedulerManager(Manager): def __init__(self): super(SchedulerManager, self).__init__("SchedulerManager") self.config_opts = [ cfg.IntOpt("backfill_depth", default=100), ] self.workers = [] def setup(self): if self.getManager("NovaManager") is None: raise SynergyError("NovaManager not found!") if self.getManager("QueueManager") is None: raise SynergyError("QueueManager not found!") if self.getManager("QuotaManager") is None: raise SynergyError("QuotaManager not found!") if self.getManager("KeystoneManager") is None: raise SynergyError("KeystoneManager not found!") if self.getManager("FairShareManager") is None: raise SynergyError("FairShareManager not found!") if self.getManager("ProjectManager") is None: raise SynergyError("ProjectManager not found!") self.nova_manager = self.getManager("NovaManager") self.queue_manager = self.getManager("QueueManager") self.quota_manager = self.getManager("QuotaManager") self.keystone_manager = self.getManager("KeystoneManager") self.fairshare_manager = self.getManager("FairShareManager") self.project_manager = self.getManager("ProjectManager") self.backfill_depth = CONF.SchedulerManager.backfill_depth self.exit = False self.configured = False def execute(self, command, *args, **kargs): raise SynergyError("command %r not supported!" % command) def task(self): if self.configured: return self.quota_manager.updateSharedQuota() try: self.dynamic_queue = self.queue_manager.createQueue("DYNAMIC") except SynergyError as ex: LOG.error("Exception has occured", exc_info=1) LOG.error(ex) self.dynamic_queue = self.queue_manager.getQueue("DYNAMIC") dynamic_worker = Worker("DYNAMIC", self.dynamic_queue, self.project_manager, self.nova_manager, self.keystone_manager, self.backfill_depth) dynamic_worker.start() self.workers.append(dynamic_worker) self.quota_manager.deleteExpiredServers() self.configured = True def destroy(self): for queue_worker in self.workers: queue_worker.destroy() def doOnEvent(self, event_type, *args, **kwargs): if event_type == "SERVER_EVENT": server = kwargs["server"] event = kwargs["event"] state = kwargs["state"] self._processServerEvent(server, event, state) elif event_type == "SERVER_CREATE": self._processServerCreate(kwargs["request"]) def _processServerEvent(self, server, event, state): if event == "compute.instance.create.end" and state == "active": LOG.info("the server %s is now active on host %s" % (server.getId(), server.getHost())) else: project = self.project_manager.getProject(id=server.getProjectId()) if not project: return quota = project.getQuota() if event == "compute.instance.delete.end": LOG.info("the server %s has been deleted on host %s" % (server.getId(), server.getHost())) try: quota.release(server) except Exception as ex: LOG.warn("cannot release server %s " "(reason=%s)" % (server.getId(), ex)) elif state == "error": LOG.info("error occurred on server %s (host %s)" % (server.getId(), server.getHost())) if not server.getTerminatedAt() and not server.getDeletedAt(): try: self.nova_manager.deleteServer(server) except Exception as ex: LOG.error("cannot delete server %s: %s" % (server.getId(), ex)) try: quota.release(server) except Exception as ex: LOG.warn("cannot release server %s " "(reason=%s)" % (server.getId(), ex)) def _processServerCreate(self, request): server = request.getServer() project = self.project_manager.getProject(id=request.getProjectId()) try: if project: quota = project.getQuota() retry = request.getRetry() num_attempts = 0 reason = None if retry: num_attempts = retry.get("num_attempts", 0) reason = retry.get("exc_reason", "n/a") if 0 < num_attempts < 3: self.nova_manager.buildServer(request) LOG.info("retrying to build the server %s user_id" "=%s prj_id=%s, num_attempts=%s, reason=%s" % (request.getId(), request.getUserId(), request.getProjectId(), num_attempts, reason)) return if server.isPermanent(): if quota.allocate(server, blocking=False): LOG.info("new request: id=%s user_id=%s prj_id=%s " "quota=private" % (request.getId(), request.getUserId(), request.getProjectId())) self.nova_manager.buildServer(request) LOG.info("building server %s user_id=%s prj_id=%s " "quota=private" % (server.getId(), request.getUserId(), request.getProjectId())) else: self.nova_manager.deleteServer(server) LOG.info("request rejected (quota exceeded): " "id=%s user_id=%s prj_id=%s " "quota=private" % (request.getId(), request.getUserId(), request.getProjectId())) else: priority = self.fairshare_manager.calculatePriority( user_id=request.getUserId(), prj_id=request.getProjectId(), timestamp=request.getCreatedAt(), retry=num_attempts) context = request.getContext() km = self.keystone_manager token_user = km.validateToken(context["auth_token"]) token_admin = km.getToken() admin_id = token_admin.getUser().getId() trust = None trusts = km.getTrusts( user_id=token_user.getUser().getId(), token=token_user) for _trust in trusts: if _trust.getTrusteeUserId() == admin_id: trust = _trust break if not trust: trust = km.makeTrust( token_admin.getUser().getId(), token_user) context["trust_id"] = trust.getId() self.dynamic_queue.insertItem(request.getUserId(), request.getProjectId(), priority=priority, data=request.toDict()) LOG.info("new request: id=%s user_id=%s prj_id=%s priority" "=%s quota=shared" % (request.getId(), request.getUserId(), request.getProjectId(), priority)) else: self.nova_manager.buildServer(request) except SynergyError as ex: LOG.error("Exception has occured", exc_info=1) LOG.error(ex)