From 58988fc3e33079fc43ced62ecefde7ba2a171147 Mon Sep 17 00:00:00 2001 From: Fabrizio Vanni Date: Mon, 8 Jun 2015 12:05:08 +0100 Subject: [PATCH] freezer scheduler The freezer scheduler is to be executed as daemon process on the client machines It has the following responsibilities: * when using the api: - register -if necessary- as a client in the api - download the list of jobs from the api - schedule the jobs for execution - launch the freezer client at the scheduled time - collect metadata and exit codes and upload them to the api - periodically poll the api for new/updated jobs - if a job is part of a session (a coordinated group of jobs) it updates the session status when job starts/stops * when not using the api - load jobs configurations from files - schedule the jobs for execution - launch the freezer client at the scheduled time The freezer scheduler can also be used to manage jobs and sessions using the following positional parameters: job-list job-get job-create job-delete job-start job-stop session-list session-get session-create session-delete session-list-job session-add-job session-remove-job or to register the client in the api using the positional parameter: register Implements blueprint: freezer-scheduler-start Change-Id: I06ae202a0f464f7240c137744a5b54d1177cabd9 --- actions.py | 80 +++++++++++++++++++++++++ client.py | 4 ++ exceptions.py | 4 ++ sessions.py | 162 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 250 insertions(+) create mode 100644 actions.py create mode 100644 sessions.py diff --git a/actions.py b/actions.py new file mode 100644 index 0000000..5d19fa2 --- /dev/null +++ b/actions.py @@ -0,0 +1,80 @@ +""" +Copyright 2015 Hewlett-Packard + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This product includes cryptographic software written by Eric Young +(eay@cryptsoft.com). This product includes software written by Tim +Hudson (tjh@cryptsoft.com). +======================================================================== +""" + +import json +import requests + +import exceptions + + +class ActionManager(object): + + def __init__(self, client): + self.client = client + self.endpoint = self.client.endpoint + '/v1/actions/' + + @property + def headers(self): + return {'X-Auth-Token': self.client.auth_token} + + def create(self, doc, action_id=''): + action_id = action_id or doc.get('action_id', '') + endpoint = self.endpoint + action_id + r = requests.post(endpoint, + data=json.dumps(doc), + headers=self.headers) + if r.status_code != 201: + raise exceptions.ApiClientException(r) + action_id = r.json()['action_id'] + return action_id + + def delete(self, action_id): + endpoint = self.endpoint + action_id + r = requests.delete(endpoint, headers=self.headers) + if r.status_code != 204: + raise exceptions.ApiClientException(r) + + def list(self, limit=10, offset=0, search=None): + data = json.dumps(search) if search else None + query = {'limit': int(limit), 'offset': int(offset)} + r = requests.get(self.endpoint, headers=self.headers, + params=query, data=data) + if r.status_code != 200: + raise exceptions.ApiClientException(r) + return r.json()['actions'] + + def get(self, action_id): + endpoint = self.endpoint + action_id + r = requests.get(endpoint, headers=self.headers) + if r.status_code == 200: + return r.json() + if r.status_code == 404: + return None + raise exceptions.ApiClientException(r) + + def update(self, action_id, update_doc): + endpoint = self.endpoint + action_id + r = requests.patch(endpoint, + headers=self.headers, + data=json.dumps(update_doc)) + if r.status_code != 200: + raise exceptions.ApiClientException(r) + return r.json()['version'] diff --git a/client.py b/client.py index 74b57e9..e96edfb 100644 --- a/client.py +++ b/client.py @@ -26,6 +26,8 @@ from openstackclient.identity import client as os_client from backups import BackupsManager from registration import RegistrationManager from jobs import JobManager +from actions import ActionManager +from sessions import SessionManager import exceptions @@ -64,6 +66,8 @@ class Client(object): self.backups = BackupsManager(self) self.registration = RegistrationManager(self) self.jobs = JobManager(self) + self.actions = ActionManager(self) + self.sessions = SessionManager(self) @cached_property def endpoint(self): diff --git a/exceptions.py b/exceptions.py index fea9930..c99a6b7 100644 --- a/exceptions.py +++ b/exceptions.py @@ -67,4 +67,8 @@ class ApiClientException(Exception): message = self.get_message_from_api_response(r) or \ self.get_message_from_response(r) or \ str(r) + try: + self.status_code = r.status_code + except: + self.status_code = None super(ApiClientException, self).__init__(message) diff --git a/sessions.py b/sessions.py new file mode 100644 index 0000000..4219be0 --- /dev/null +++ b/sessions.py @@ -0,0 +1,162 @@ +""" +Copyright 2015 Hewlett-Packard + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This product includes cryptographic software written by Eric Young +(eay@cryptsoft.com). This product includes software written by Tim +Hudson (tjh@cryptsoft.com). +======================================================================== +""" + +import json +import requests + +import exceptions + + +class SessionManager(object): + + def __init__(self, client): + self.client = client + self.endpoint = self.client.endpoint + '/v1/sessions/' + + @property + def headers(self): + return {'X-Auth-Token': self.client.auth_token} + + def create(self, doc, session_id=''): + session_id = session_id or doc.get('session_id', '') + endpoint = self.endpoint + session_id + r = requests.post(endpoint, + data=json.dumps(doc), + headers=self.headers) + if r.status_code != 201: + raise exceptions.ApiClientException(r) + session_id = r.json()['session_id'] + return session_id + + def delete(self, session_id): + endpoint = self.endpoint + session_id + r = requests.delete(endpoint, headers=self.headers) + if r.status_code != 204: + raise exceptions.ApiClientException(r) + + def list_all(self, limit=10, offset=0, search=None): + data = json.dumps(search) if search else None + query = {'limit': int(limit), 'offset': int(offset)} + r = requests.get(self.endpoint, headers=self.headers, + params=query, data=data) + if r.status_code != 200: + raise exceptions.ApiClientException(r) + return r.json()['sessions'] + + def list(self, limit=10, offset=0, search={}): + new_search = search.copy() + new_search['match'] = search.get('match', []) + return self.list_all(limit, offset, new_search) + + def get(self, session_id): + endpoint = self.endpoint + session_id + r = requests.get(endpoint, headers=self.headers) + if r.status_code == 200: + return r.json() + if r.status_code == 404: + return None + raise exceptions.ApiClientException(r) + + def update(self, session_id, update_doc): + endpoint = self.endpoint + session_id + r = requests.patch(endpoint, + headers=self.headers, + data=json.dumps(update_doc)) + if r.status_code != 200: + raise exceptions.ApiClientException(r) + return r.json()['version'] + + def add_job(self, session_id, job_id): + # endpoint /v1/sessions/{sessions_id}/jobs/{job_id} + endpoint = '{0}{1}/jobs/{2}'.format(self.endpoint, session_id, job_id) + r = requests.put(endpoint, + headers=self.headers) + if r.status_code != 204: + raise exceptions.ApiClientException(r) + return + + def remove_job(self, session_id, job_id): + # endpoint /v1/sessions/{sessions_id}/jobs/{job_id} + endpoint = '{0}{1}/jobs/{2}'.format(self.endpoint, session_id, job_id) + retry = 5 + r = '' + while retry: + r = requests.delete(endpoint, + headers=self.headers) + if r.status_code == 204: + return + retry -= 1 + raise exceptions.ApiClientException(r) + + def start_session(self, session_id, job_id, session_tag): + """ + Informs the api that the client is starting the session + identified by the session_id and request the session_tag + to be incremented up to the requested value. + The returned session_id could be: + * current_tag + 1 if the session has started + * > current_tag + 1 if the action had already been started + by some other node and this node was out of sync + + :param session_id: + :param job_id: + :param session_tag: the new session_id + :return: the response obj: + { result: string 'running' or 'error', + 'session_tag': the new session_tag ) + """ + # endpoint /v1/sessions/{sessions_id}/action + endpoint = '{0}{1}/action'.format(self.endpoint, session_id) + doc = {"start": { + "job_id": job_id, + "current_tag": session_tag + }} + r = requests.post(endpoint, + headers=self.headers, + data=json.dumps(doc)) + if r.status_code != 202: + raise exceptions.ApiClientException(r) + return r.json() + + def end_session(self, session_id, job_id, session_tag, result): + """ + Informs the freezer service that the job has ended. + Privides information about the job's result and the session tag + + :param session_id: + :param job_id: + :param session_tag: + :param result: + :return: + """ + # endpoint /v1/sessions/{sessions_id}/action + endpoint = '{0}{1}/action'.format(self.endpoint, session_id) + doc = {"end": { + "job_id": job_id, + "current_tag": session_tag, + "result": result + }} + r = requests.post(endpoint, + headers=self.headers, + data=json.dumps(doc)) + if r.status_code != 202: + raise exceptions.ApiClientException(r) + return r.json()