freezer/tests/test_scheduler_arguments.py
Fabrizio Vanni d4b9399e9b freezer scheduler
The freezer scheduler is to be executed
as daemon process on the client machines

It has the following responsibilities:

  * when using the api:
    - register -if necessary- as a client in the api
    - download the list of jobs from the api
    - schedule the jobs for execution
    - launch the freezer client at the scheduled time
    - collect metadata and exit codes and upload them to the api
    - periodically poll the api for new/updated jobs
    - if a job is part of a session (a coordinated group of jobs)
      it updates the session status when job starts/stops

  * when not using the api
    - load jobs configurations from files
    - schedule the jobs for execution
    - launch the freezer client at the scheduled time

The freezer scheduler can also be used to manage jobs
and sessions using the following positional parameters:

  job-list
  job-get
  job-create
  job-delete
  job-start
  job-stop
  session-list
  session-get
  session-create
  session-delete
  session-list-job
  session-add-job
  session-remove-job

or to register the client in the api using the positional parameter:

  register

Implements blueprint: freezer-scheduler-start

Change-Id: I06ae202a0f464f7240c137744a5b54d1177cabd9
2015-07-10 18:51:17 +01:00

85 lines
3.0 KiB
Python

# Copyright 2015 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
import unittest
from mock import Mock, patch
from freezer.scheduler import arguments
class TestOpenstackOptions(unittest.TestCase):
def setUp(self):
self.args = Mock()
self.args.os_username = 'janedoe'
self.args.os_tenant_name = 'hertenant'
self.args.os_auth_url = 'herauthurl'
self.args.os_password = 'herpassword'
self.args.os_tenant_id = 'hertenantid'
self.args.os_region_name = 'herregion'
self.args.os_endpoint = 'herpublicurl'
self.empty_args = Mock()
self.empty_args.os_username = ''
self.empty_args.os_tenant_name = ''
self.empty_args.os_auth_url = ''
self.empty_args.os_password = ''
self.empty_args.os_tenant_id = ''
self.empty_args.os_region_name = ''
self.empty_args.os_endpoint = ''
self.env_dict = {
'OS_USERNAME': 'johndoe',
'OS_TENANT_NAME': 'histenant',
'OS_AUTH_URL': 'hisauthurl',
'OS_PASSWORD': 'hispassword',
'OS_TENANT_ID': 'histenantid',
'OS_REGION_NAME': 'hisregion',
'OS_SERVICE_ENDPOINT': 'hispublicurl'
}
def test_create_with_args_and_env(self):
os = arguments.OpenstackOptions(self.args, self.env_dict)
self.assertIsInstance(os, arguments.OpenstackOptions)
def test_create_with_empty_args_and_empty_env(self):
os = arguments.OpenstackOptions(self.empty_args, self.env_dict)
self.assertIsInstance(os, arguments.OpenstackOptions)
def test_create_with_args_and_empty_env(self):
os = arguments.OpenstackOptions(self.args, {})
self.assertIsInstance(os, arguments.OpenstackOptions)
def test_create_raises_Exception_when_missing_username(self):
self.args.os_username = ''
self.assertRaises(Exception, arguments.OpenstackOptions, self.args, {})
def test_create_raises_Exception_when_missing_p(self):
self.args.os_password = ''
self.assertRaises(Exception, arguments.OpenstackOptions, self.args, {})
def test_create_raises_Exception_when_missing_parameter(self):
self.args.os_username = ''
self.assertRaises(Exception, arguments.OpenstackOptions, self.args, {})
def test_str(self):
os = arguments.OpenstackOptions(self.args, self.env_dict)
s = str(os)
self.assertIsInstance(s, str)
class TestGetArgs(unittest.TestCase):
@patch('freezer.scheduler.arguments.argparse.ArgumentParser')
def test_get_args_calls_add_argument(self, mock_ArgumentParser):
mock_arg_parser = Mock()
mock_ArgumentParser.return_value = mock_arg_parser
retval = arguments.get_args(['alpha', 'bravo'])
call_count = mock_arg_parser.add_argument.call_count
self.assertGreater(call_count, 15)