freezer scheduler

The freezer scheduler is to be executed
as daemon process on the client machines

It has the following responsibilities:

  * when using the api:
    - register -if necessary- as a client in the api
    - download the list of jobs from the api
    - schedule the jobs for execution
    - launch the freezer client at the scheduled time
    - collect metadata and exit codes and upload them to the api
    - periodically poll the api for new/updated jobs
    - if a job is part of a session (a coordinated group of jobs)
      it updates the session status when job starts/stops

  * when not using the api
    - load jobs configurations from files
    - schedule the jobs for execution
    - launch the freezer client at the scheduled time

The freezer scheduler can also be used to manage jobs
and sessions using the following positional parameters:

  job-list
  job-get
  job-create
  job-delete
  job-start
  job-stop
  session-list
  session-get
  session-create
  session-delete
  session-list-job
  session-add-job
  session-remove-job

or to register the client in the api using the positional parameter:

  register

Implements blueprint: freezer-scheduler-start

Change-Id: I06ae202a0f464f7240c137744a5b54d1177cabd9
This commit is contained in:
Fabrizio Vanni 2015-06-08 12:05:08 +01:00 committed by Fausto Marzi
parent 5d75e3c789
commit 58988fc3e3
4 changed files with 250 additions and 0 deletions

80
actions.py Normal file
View File

@ -0,0 +1,80 @@
"""
Copyright 2015 Hewlett-Packard
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
This product includes cryptographic software written by Eric Young
(eay@cryptsoft.com). This product includes software written by Tim
Hudson (tjh@cryptsoft.com).
========================================================================
"""
import json
import requests
import exceptions
class ActionManager(object):
def __init__(self, client):
self.client = client
self.endpoint = self.client.endpoint + '/v1/actions/'
@property
def headers(self):
return {'X-Auth-Token': self.client.auth_token}
def create(self, doc, action_id=''):
action_id = action_id or doc.get('action_id', '')
endpoint = self.endpoint + action_id
r = requests.post(endpoint,
data=json.dumps(doc),
headers=self.headers)
if r.status_code != 201:
raise exceptions.ApiClientException(r)
action_id = r.json()['action_id']
return action_id
def delete(self, action_id):
endpoint = self.endpoint + action_id
r = requests.delete(endpoint, headers=self.headers)
if r.status_code != 204:
raise exceptions.ApiClientException(r)
def list(self, limit=10, offset=0, search=None):
data = json.dumps(search) if search else None
query = {'limit': int(limit), 'offset': int(offset)}
r = requests.get(self.endpoint, headers=self.headers,
params=query, data=data)
if r.status_code != 200:
raise exceptions.ApiClientException(r)
return r.json()['actions']
def get(self, action_id):
endpoint = self.endpoint + action_id
r = requests.get(endpoint, headers=self.headers)
if r.status_code == 200:
return r.json()
if r.status_code == 404:
return None
raise exceptions.ApiClientException(r)
def update(self, action_id, update_doc):
endpoint = self.endpoint + action_id
r = requests.patch(endpoint,
headers=self.headers,
data=json.dumps(update_doc))
if r.status_code != 200:
raise exceptions.ApiClientException(r)
return r.json()['version']

View File

@ -26,6 +26,8 @@ from openstackclient.identity import client as os_client
from backups import BackupsManager
from registration import RegistrationManager
from jobs import JobManager
from actions import ActionManager
from sessions import SessionManager
import exceptions
@ -64,6 +66,8 @@ class Client(object):
self.backups = BackupsManager(self)
self.registration = RegistrationManager(self)
self.jobs = JobManager(self)
self.actions = ActionManager(self)
self.sessions = SessionManager(self)
@cached_property
def endpoint(self):

View File

@ -67,4 +67,8 @@ class ApiClientException(Exception):
message = self.get_message_from_api_response(r) or \
self.get_message_from_response(r) or \
str(r)
try:
self.status_code = r.status_code
except:
self.status_code = None
super(ApiClientException, self).__init__(message)

162
sessions.py Normal file
View File

@ -0,0 +1,162 @@
"""
Copyright 2015 Hewlett-Packard
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
This product includes cryptographic software written by Eric Young
(eay@cryptsoft.com). This product includes software written by Tim
Hudson (tjh@cryptsoft.com).
========================================================================
"""
import json
import requests
import exceptions
class SessionManager(object):
def __init__(self, client):
self.client = client
self.endpoint = self.client.endpoint + '/v1/sessions/'
@property
def headers(self):
return {'X-Auth-Token': self.client.auth_token}
def create(self, doc, session_id=''):
session_id = session_id or doc.get('session_id', '')
endpoint = self.endpoint + session_id
r = requests.post(endpoint,
data=json.dumps(doc),
headers=self.headers)
if r.status_code != 201:
raise exceptions.ApiClientException(r)
session_id = r.json()['session_id']
return session_id
def delete(self, session_id):
endpoint = self.endpoint + session_id
r = requests.delete(endpoint, headers=self.headers)
if r.status_code != 204:
raise exceptions.ApiClientException(r)
def list_all(self, limit=10, offset=0, search=None):
data = json.dumps(search) if search else None
query = {'limit': int(limit), 'offset': int(offset)}
r = requests.get(self.endpoint, headers=self.headers,
params=query, data=data)
if r.status_code != 200:
raise exceptions.ApiClientException(r)
return r.json()['sessions']
def list(self, limit=10, offset=0, search={}):
new_search = search.copy()
new_search['match'] = search.get('match', [])
return self.list_all(limit, offset, new_search)
def get(self, session_id):
endpoint = self.endpoint + session_id
r = requests.get(endpoint, headers=self.headers)
if r.status_code == 200:
return r.json()
if r.status_code == 404:
return None
raise exceptions.ApiClientException(r)
def update(self, session_id, update_doc):
endpoint = self.endpoint + session_id
r = requests.patch(endpoint,
headers=self.headers,
data=json.dumps(update_doc))
if r.status_code != 200:
raise exceptions.ApiClientException(r)
return r.json()['version']
def add_job(self, session_id, job_id):
# endpoint /v1/sessions/{sessions_id}/jobs/{job_id}
endpoint = '{0}{1}/jobs/{2}'.format(self.endpoint, session_id, job_id)
r = requests.put(endpoint,
headers=self.headers)
if r.status_code != 204:
raise exceptions.ApiClientException(r)
return
def remove_job(self, session_id, job_id):
# endpoint /v1/sessions/{sessions_id}/jobs/{job_id}
endpoint = '{0}{1}/jobs/{2}'.format(self.endpoint, session_id, job_id)
retry = 5
r = ''
while retry:
r = requests.delete(endpoint,
headers=self.headers)
if r.status_code == 204:
return
retry -= 1
raise exceptions.ApiClientException(r)
def start_session(self, session_id, job_id, session_tag):
"""
Informs the api that the client is starting the session
identified by the session_id and request the session_tag
to be incremented up to the requested value.
The returned session_id could be:
* current_tag + 1 if the session has started
* > current_tag + 1 if the action had already been started
by some other node and this node was out of sync
:param session_id:
:param job_id:
:param session_tag: the new session_id
:return: the response obj:
{ result: string 'running' or 'error',
'session_tag': the new session_tag )
"""
# endpoint /v1/sessions/{sessions_id}/action
endpoint = '{0}{1}/action'.format(self.endpoint, session_id)
doc = {"start": {
"job_id": job_id,
"current_tag": session_tag
}}
r = requests.post(endpoint,
headers=self.headers,
data=json.dumps(doc))
if r.status_code != 202:
raise exceptions.ApiClientException(r)
return r.json()
def end_session(self, session_id, job_id, session_tag, result):
"""
Informs the freezer service that the job has ended.
Privides information about the job's result and the session tag
:param session_id:
:param job_id:
:param session_tag:
:param result:
:return:
"""
# endpoint /v1/sessions/{sessions_id}/action
endpoint = '{0}{1}/action'.format(self.endpoint, session_id)
doc = {"end": {
"job_id": job_id,
"current_tag": session_tag,
"result": result
}}
r = requests.post(endpoint,
headers=self.headers,
data=json.dumps(doc))
if r.status_code != 202:
raise exceptions.ApiClientException(r)
return r.json()