From 857235b7fae5dc04fa6c8ad5734bc909f16ea134 Mon Sep 17 00:00:00 2001 From: joelee Date: Thu, 2 Aug 2018 10:08:30 +0300 Subject: [PATCH] Add folder credential support implement functions to manage credential in Jenkins Folder job with rest api provided by : https://wiki.jenkins.io/display/JENKINS/Credentials+Plugin Change-Id: I9bcc3943e4ec705fe8705ad1d457b6fd4ad1024a --- jenkins/__init__.py | 192 +++++++++++++++++++++ tests/test_credential.py | 353 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 545 insertions(+) create mode 100644 tests/test_credential.py diff --git a/jenkins/__init__.py b/jenkins/__init__.py index 4216fa4..9932556 100755 --- a/jenkins/__init__.py +++ b/jenkins/__init__.py @@ -62,6 +62,7 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning from six.moves.http_client import BadStatusLine from six.moves.urllib.error import URLError from six.moves.urllib.parse import quote, urlencode, urljoin, urlparse +import xml.etree.ElementTree as ET from jenkins import plugins @@ -141,6 +142,14 @@ PROMOTION_INFO = '%(folder_url)sjob/%(short_name)s/promotion/api/json?depth=%(de DELETE_PROMOTION = '%(folder_url)sjob/%(short_name)s/promotion/process/%(name)s/doDelete' CREATE_PROMOTION = '%(folder_url)sjob/%(short_name)s/promotion/createProcess?name=%(name)s' CONFIG_PROMOTION = '%(folder_url)sjob/%(short_name)s/promotion/process/%(name)s/config.xml' +LIST_CREDENTIALS = '%(folder_url)sjob/%(short_name)s/credentials/store/folder/' \ + 'domain/%(domain_name)s/api/json?tree=credentials[id]' +CREATE_CREDENTIAL = '%(folder_url)sjob/%(short_name)s/credentials/store/folder/' \ + 'domain/%(domain_name)s/createCredentials' +CONFIG_CREDENTIAL = '%(folder_url)sjob/%(short_name)s/credentials/store/folder/' \ + 'domain/%(domain_name)s/credential/%(name)s/config.xml' +CREDENTIAL_INFO = '%(folder_url)sjob/%(short_name)s/credentials/store/folder/' \ + 'domain/%(domain_name)s/credential/%(name)s/api/json?depth=0' QUIET_DOWN = 'quietDown' # for testing only @@ -1944,6 +1953,189 @@ class Jenkins(object): 'GET', self._build_url(CONFIG_PROMOTION, locals())) return self.jenkins_open(request) + def _get_tag_text(self, name, xml): + '''Get text of tag from xml + + :param name: XML tag name, ``str`` + :param xml: XML configuration, ``str`` + :returns: Text of tag, ``str`` + :throws: :class:`JenkinsException` whenever tag does not exist + or has invalidated text + ''' + tag = ET.fromstring(xml).find(name) + try: + text = tag.text.strip() + if text: + return text + raise JenkinsException("tag[%s] is invalidated" % name) + except AttributeError: + raise JenkinsException("tag[%s] is invalidated" % name) + + def assert_folder(self, name, exception_message='job[%s] is not a folder'): + '''Raise an exception if job is not Cloudbees Folder + + :param name: Name of job, ``str`` + :param exception_message: Message to use for the exception. + :throws: :class:`JenkinsException` whenever the job is + not Cloudbees Folder + ''' + if not self.is_folder(name): + raise JenkinsException(exception_message % name) + + def is_folder(self, name): + '''Check whether a job is Cloudbees Folder + + :param name: Job name, ``str`` + :returns: ``True`` if job is folder, ``False`` otherwise + ''' + return 'com.cloudbees.hudson.plugins.folder.Folder' \ + == self.get_job_info(name)['_class'] + + def assert_credential_exists(self, name, folder_name, domain_name='_', + exception_message='credential[%s] does not ' + 'exist in the domain[%s] of [%s]'): + '''Raise an exception if credential does not exist in domain of folder + + :param name: Name of credential, ``str`` + :param folder_name: Folder name, ``str`` + :param domain_name: Domain name, default is '_', ``str`` + :param exception_message: Message to use for the exception. + Formatted with ``name``, ``domain_name``, + and ``folder_name`` + :throws: :class:`JenkinsException` whenever the credentail + does not exist in domain of folder + ''' + if not self.credential_exists(name, folder_name, domain_name): + raise JenkinsException(exception_message + % (name, domain_name, folder_name)) + + def credential_exists(self, name, folder_name, domain_name='_'): + '''Check whether a credentail exists in domain of folder + + :param name: Name of credentail, ``str`` + :param folder_name: Folder name, ``str`` + :param domain_name: Domain name, default is '_', ``str`` + :returns: ``True`` if credentail exists, ``False`` otherwise + ''' + try: + return self.get_credential_info(name, folder_name, + domain_name)['id'] == name + except JenkinsException: + return False + + def get_credential_info(self, name, folder_name, domain_name='_'): + '''Get credential information dictionary in domain of folder + + :param name: Name of credentail, ``str`` + :param folder_name: folder_name, ``str`` + :param domain_name: Domain name, default is '_', ``str`` + :returns: Dictionary of credential info, ``dict`` + ''' + self.assert_folder(folder_name) + folder_url, short_name = self._get_job_folder(folder_name) + try: + response = self.jenkins_open(requests.Request( + 'GET', self._build_url(CREDENTIAL_INFO, locals()) + )) + if response: + return json.loads(response) + else: + raise JenkinsException('credential[%s] does not exist ' + 'in the domain[%s] of [%s]' + % (name, domain_name, folder_name)) + except (req_exc.HTTPError, NotFoundException): + raise JenkinsException('credential[%s] does not exist ' + 'in the domain[%s] of [%s]' + % (name, domain_name, folder_name)) + except ValueError: + raise JenkinsException( + 'Could not parse JSON info for credential[%s] ' + 'in the domain[%s] of [%s]' + % (name, domain_name, folder_name) + ) + + def get_credential_config(self, name, folder_name, domain_name='_'): + '''Get configuration of credential in domain of folder. + + :param name: Name of credentail, ``str`` + :param folder_name: Folder name, ``str`` + :param domain_name: Domain name, default is '_', ``str`` + :returns: Credential configuration (XML format) + ''' + self.assert_folder(folder_name) + folder_url, short_name = self._get_job_folder(folder_name) + return self.jenkins_open(requests.Request( + 'GET', self._build_url(CONFIG_CREDENTIAL, locals()) + )) + + def create_credential(self, folder_name, config_xml, + domain_name='_'): + '''Create credentail in domain of folder + + :param folder_name: Folder name, ``str`` + :param config_xml: New XML configuration, ``str`` + :param domain_name: Domain name, default is '_', ``str`` + ''' + folder_url, short_name = self._get_job_folder(folder_name) + name = self._get_tag_text('id', config_xml) + if self.credential_exists(name, folder_name, domain_name): + raise JenkinsException('credential[%s] already exists ' + 'in the domain[%s] of [%s]' + % (name, domain_name, folder_name)) + + self.jenkins_open(requests.Request( + 'POST', self._build_url(CREATE_CREDENTIAL, locals()), + data=config_xml.encode('utf-8'), + headers=DEFAULT_HEADERS + )) + self.assert_credential_exists(name, folder_name, domain_name, + 'create[%s] failed in the ' + 'domain[%s] of [%s]') + + def delete_credential(self, name, folder_name, domain_name='_'): + '''Delete credential from domain of folder + + :param name: Name of credentail, ``str`` + :param folder_name: Folder name, ``str`` + :param domain_name: Domain name, default is '_', ``str`` + ''' + folder_url, short_name = self._get_job_folder(folder_name) + self.jenkins_open(requests.Request( + 'DELETE', self._build_url(CONFIG_CREDENTIAL, locals()) + )) + if self.credential_exists(name, folder_name, domain_name): + raise JenkinsException('delete credential[%s] from ' + 'domain[%s] of [%s] failed' + % (name, domain_name, folder_name)) + + def reconfig_credential(self, folder_name, config_xml, domain_name='_'): + '''Reconfig credential with new config in domain of folder + + :param folder_name: Folder name, ``str`` + :param config_xml: New XML configuration, ``str`` + :param domain_name: Domain name, default is '_', ``str`` + ''' + folder_url, short_name = self._get_job_folder(folder_name) + name = self._get_tag_text('id', config_xml) + self.assert_credential_exists(name, folder_name, domain_name) + self.jenkins_open(requests.Request( + 'POST', self._build_url(CONFIG_CREDENTIAL, locals()) + )) + + def list_credentials(self, folder_name, domain_name='_'): + '''List credentials in domain of folder + + :param folder_name: Folder name, ``str`` + :param domain_name: Domain name, default is '_', ``str`` + :returns: Credentials list, ``list`` + ''' + self.assert_folder(folder_name) + folder_url, short_name = self._get_job_folder(folder_name) + response = self.jenkins_open(requests.Request( + 'GET', self._build_url(LIST_CREDENTIALS, locals()) + )) + return json.loads(response)['credentials'] + def quiet_down(self): '''Prepare Jenkins for shutdown. diff --git a/tests/test_credential.py b/tests/test_credential.py new file mode 100644 index 0000000..8e4e5f7 --- /dev/null +++ b/tests/test_credential.py @@ -0,0 +1,353 @@ +import json +from mock import patch + +import jenkins +from tests.base import JenkinsTestBase + + +class JenkinsCredentialTestBase(JenkinsTestBase): + config_xml = """ + GLOBAL + Test Credential + Test-User + secret123 + """ + + +class JenkinsGetTagTextTest(JenkinsCredentialTestBase): + + def test_simple(self): + name_to_return = self.j._get_tag_text('id', self.config_xml) + self.assertEqual('Test Credential', name_to_return) + + def test_failed(self): + with self.assertRaises(jenkins.JenkinsException) as context_manager: + self.j._get_tag_text('id', '') + self.assertEqual(str(context_manager.exception), + 'tag[id] is invalidated') + + with self.assertRaises(jenkins.JenkinsException) as context_manager: + self.j._get_tag_text('id', '') + self.assertEqual(str(context_manager.exception), + 'tag[id] is invalidated') + + with self.assertRaises(jenkins.JenkinsException) as context_manager: + self.j._get_tag_text('id', ' ') + self.assertEqual(str(context_manager.exception), + 'tag[id] is invalidated') + + +class JenkinsIsFolderTest(JenkinsCredentialTestBase): + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_is_folder(self, jenkins_mock): + jenkins_mock.side_effect = [ + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + ] + self.assertTrue(self.j.is_folder('Test Folder')) + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_is_not_folder(self, jenkins_mock): + jenkins_mock.side_effect = [ + json.dumps({'_class': 'org.jenkinsci.plugins.workflow.job.WorkflowJob'}), + ] + self.assertFalse(self.j.is_folder('Test Job')) + + +class JenkinsAssertFolderTest(JenkinsCredentialTestBase): + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_is_folder(self, jenkins_mock): + jenkins_mock.side_effect = [ + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + ] + self.j.assert_folder('Test Folder') + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_is_not_folder(self, jenkins_mock): + jenkins_mock.side_effect = [ + json.dumps({'_class': 'org.jenkinsci.plugins.workflow.job.WorkflowJob'}), + ] + with self.assertRaises(jenkins.JenkinsException) as context_manager: + self.j.assert_folder('Test Job') + self.assertEqual(str(context_manager.exception), + 'job[Test Job] is not a folder') + + +class JenkinsAssertCredentialTest(JenkinsCredentialTestBase): + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_credential_missing(self, jenkins_mock): + jenkins_mock.side_effect = [ + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + jenkins.NotFoundException() + ] + + with self.assertRaises(jenkins.JenkinsException) as context_manager: + self.j.assert_credential_exists('NonExistent', 'TestFoler') + self.assertEqual( + str(context_manager.exception), + 'credential[NonExistent] does not exist' + ' in the domain[_] of [TestFoler]') + self._check_requests(jenkins_mock.call_args_list) + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_credential_exists(self, jenkins_mock): + jenkins_mock.side_effect = [ + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + json.dumps({'id': 'ExistingCredential'}) + ] + self.j.assert_credential_exists('ExistingCredential', 'TestFoler') + self._check_requests(jenkins_mock.call_args_list) + + +class JenkinsCredentialExistsTest(JenkinsCredentialTestBase): + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_credential_missing(self, jenkins_mock): + jenkins_mock.side_effect = [ + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + jenkins.NotFoundException() + ] + + self.assertEqual(self.j.credential_exists('NonExistent', 'TestFolder'), + False) + self._check_requests(jenkins_mock.call_args_list) + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_credential_exists(self, jenkins_mock): + jenkins_mock.side_effect = [ + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + json.dumps({'id': 'ExistingCredential'}) + ] + + self.assertEqual(self.j.credential_exists('ExistingCredential', + 'TestFolder'), + True) + self._check_requests(jenkins_mock.call_args_list) + + +class JenkinsGetCredentialInfoTest(JenkinsCredentialTestBase): + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_simple(self, jenkins_mock): + credential_info_to_return = {'id': 'ExistingCredential'} + jenkins_mock.side_effect = [ + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + json.dumps(credential_info_to_return) + ] + + credential_info = self.j.get_credential_info('ExistingCredential', 'TestFolder') + + self.assertEqual(credential_info, credential_info_to_return) + self.assertEqual( + jenkins_mock.call_args[0][0].url, + self.make_url('job/TestFolder/credentials/store/folder/' + 'domain/_/credential/ExistingCredential/api/json?depth=0')) + self._check_requests(jenkins_mock.call_args_list) + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_nonexistent(self, jenkins_mock): + jenkins_mock.side_effect = [ + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + None, + ] + + with self.assertRaises(jenkins.JenkinsException) as context_manager: + self.j.get_credential_info('NonExistent', 'TestFolder') + + self.assertEqual( + str(context_manager.exception), + 'credential[NonExistent] does not exist ' + 'in the domain[_] of [TestFolder]') + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_invalid_json(self, jenkins_mock): + jenkins_mock.side_effect = [ + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + '{invalid_json}' + ] + + with self.assertRaises(jenkins.JenkinsException) as context_manager: + self.j.get_credential_info('NonExistent', 'TestFolder') + + self.assertEqual( + str(context_manager.exception), + 'Could not parse JSON info for credential[NonExistent]' + ' in the domain[_] of [TestFolder]') + + +class JenkinsGetCredentialConfigTest(JenkinsCredentialTestBase): + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_encodes_credential_name(self, jenkins_mock): + jenkins_mock.side_effect = [ + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + None, + ] + self.j.get_credential_config(u'Test Credential', u'Test Folder') + + self.assertEqual( + jenkins_mock.call_args_list[1][0][0].url, + self.make_url('job/Test%20Folder/credentials/store/folder/domain/' + '_/credential/Test%20Credential/config.xml')) + self._check_requests(jenkins_mock.call_args_list) + + +class JenkinsCreateCredentialTest(JenkinsCredentialTestBase): + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_simple(self, jenkins_mock): + jenkins_mock.side_effect = [ + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + jenkins.NotFoundException(), + None, + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + json.dumps({'id': 'Test Credential'}), + ] + + self.j.create_credential('Test Folder', self.config_xml) + self.assertEqual( + jenkins_mock.call_args_list[1][0][0].url, + self.make_url('job/Test%20Folder/credentials/store/folder/' + 'domain/_/credential/Test%20Credential/api/json?depth=0')) + + self.assertEqual( + jenkins_mock.call_args_list[2][0][0].url, + self.make_url('job/Test%20Folder/credentials/store/folder/' + 'domain/_/createCredentials')) + + self.assertEqual( + jenkins_mock.call_args_list[4][0][0].url, + self.make_url('job/Test%20Folder/credentials/store/folder/' + 'domain/_/credential/Test%20Credential/api/json?depth=0')) + self._check_requests(jenkins_mock.call_args_list) + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_already_exists(self, jenkins_mock): + jenkins_mock.side_effect = [ + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + json.dumps({'id': 'Test Credential'}), + ] + + with self.assertRaises(jenkins.JenkinsException) as context_manager: + self.j.create_credential('Test Folder', self.config_xml) + self.assertEqual( + jenkins_mock.call_args_list[1][0][0].url, + self.make_url('job/Test%20Folder/credentials/store/folder/' + 'domain/_/credential/Test%20Credential/api/json?depth=0')) + + self.assertEqual( + str(context_manager.exception), + 'credential[Test Credential] already exists' + ' in the domain[_] of [Test Folder]') + self._check_requests(jenkins_mock.call_args_list) + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_failed(self, jenkins_mock): + jenkins_mock.side_effect = [ + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + jenkins.NotFoundException(), + None, + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + None, + ] + + with self.assertRaises(jenkins.JenkinsException) as context_manager: + self.j.create_credential('Test Folder', self.config_xml) + self.assertEqual( + jenkins_mock.call_args_list[1][0][0].url, + self.make_url('job/Test%20Folder/credentials/store/folder/' + 'domain/_/credential/Test%20Credential/api/json?depth=0')) + self.assertEqual( + jenkins_mock.call_args_list[2][0][0].url, + self.make_url('job/Test%20Folder/credentials/store/' + 'folder/domain/_/createCredentials')) + self.assertEqual( + jenkins_mock.call_args_list[4][0][0].url, + self.make_url('job/Test%20Folder/credentials/store/folder/' + 'domain/_/credential/Test%20Credential/api/json?depth=0')) + self.assertEqual( + str(context_manager.exception), + 'create[Test Credential] failed in the domain[_] of [Test Folder]') + self._check_requests(jenkins_mock.call_args_list) + + +class JenkinsDeleteCredentialTest(JenkinsCredentialTestBase): + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_simple(self, jenkins_mock): + jenkins_mock.side_effect = [ + True, + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + jenkins.NotFoundException(), + ] + + self.j.delete_credential(u'Test Credential', 'TestFolder') + + self.assertEqual( + jenkins_mock.call_args_list[0][0][0].url, + self.make_url('job/TestFolder/credentials/store/folder/domain/' + '_/credential/Test%20Credential/config.xml')) + self._check_requests(jenkins_mock.call_args_list) + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_failed(self, jenkins_mock): + jenkins_mock.side_effect = [ + json.dumps({'id': 'ExistingCredential'}), + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + json.dumps({'id': 'ExistingCredential'}) + ] + + with self.assertRaises(jenkins.JenkinsException) as context_manager: + self.j.delete_credential(u'ExistingCredential', 'TestFolder') + self.assertEqual( + jenkins_mock.call_args_list[0][0][0].url, + self.make_url('job/TestFolder/credentials/store/folder/' + 'domain/_/credential/ExistingCredential/config.xml')) + self.assertEqual( + str(context_manager.exception), + 'delete credential[ExistingCredential] from ' + 'domain[_] of [TestFolder] failed') + self._check_requests(jenkins_mock.call_args_list) + + +class JenkinsReconfigCredentialTest(JenkinsCredentialTestBase): + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_simple(self, jenkins_mock): + jenkins_mock.side_effect = [ + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + json.dumps({'id': 'Test Credential'}), + None + ] + + self.j.reconfig_credential(u'Test Folder', self.config_xml) + + self.assertEqual( + jenkins_mock.call_args_list[1][0][0].url, + self.make_url('job/Test%20Folder/credentials/store/folder/domain/' + '_/credential/Test%20Credential/api/json?depth=0')) + self.assertEqual( + jenkins_mock.call_args_list[2][0][0].url, + self.make_url('job/Test%20Folder/credentials/store/folder/domain/' + '_/credential/Test%20Credential/config.xml')) + self._check_requests(jenkins_mock.call_args_list) + + +class JenkinsListCredentialConfigTest(JenkinsCredentialTestBase): + + @patch.object(jenkins.Jenkins, 'jenkins_open') + def test_simple(self, jenkins_mock): + credentials_to_return = [{'id': 'Test Credential'}] + jenkins_mock.side_effect = [ + json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}), + json.dumps({'credentials': [{'id': 'Test Credential'}]}), + ] + credentials = self.j.list_credentials(u'Test Folder') + self.assertEqual(credentials, credentials_to_return) + self.assertEqual( + jenkins_mock.call_args_list[1][0][0].url, + self.make_url('job/Test%20Folder/credentials/store/folder/domain/' + '_/api/json?tree=credentials[id]')) + self._check_requests(jenkins_mock.call_args_list)