diff --git a/jenkins/__init__.py b/jenkins/__init__.py
index 7c9c577..b1ec039 100755
--- a/jenkins/__init__.py
+++ b/jenkins/__init__.py
@@ -62,6 +62,7 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning
from six.moves.http_client import BadStatusLine
from six.moves.urllib.error import URLError
from six.moves.urllib.parse import quote, urlencode, urljoin, urlparse
+import xml.etree.ElementTree as ET
from jenkins import plugins
@@ -141,6 +142,14 @@ PROMOTION_INFO = '%(folder_url)sjob/%(short_name)s/promotion/api/json?depth=%(de
DELETE_PROMOTION = '%(folder_url)sjob/%(short_name)s/promotion/process/%(name)s/doDelete'
CREATE_PROMOTION = '%(folder_url)sjob/%(short_name)s/promotion/createProcess?name=%(name)s'
CONFIG_PROMOTION = '%(folder_url)sjob/%(short_name)s/promotion/process/%(name)s/config.xml'
+LIST_CREDENTIALS = '%(folder_url)sjob/%(short_name)s/credentials/store/folder/' \
+ 'domain/%(domain_name)s/api/json?tree=credentials[id]'
+CREATE_CREDENTIAL = '%(folder_url)sjob/%(short_name)s/credentials/store/folder/' \
+ 'domain/%(domain_name)s/createCredentials'
+CONFIG_CREDENTIAL = '%(folder_url)sjob/%(short_name)s/credentials/store/folder/' \
+ 'domain/%(domain_name)s/credential/%(name)s/config.xml'
+CREDENTIAL_INFO = '%(folder_url)sjob/%(short_name)s/credentials/store/folder/' \
+ 'domain/%(domain_name)s/credential/%(name)s/api/json?depth=0'
QUIET_DOWN = 'quietDown'
# for testing only
@@ -1932,6 +1941,189 @@ class Jenkins(object):
'GET', self._build_url(CONFIG_PROMOTION, locals()))
return self.jenkins_open(request)
+ def _get_tag_text(self, name, xml):
+ '''Get text of tag from xml
+
+ :param name: XML tag name, ``str``
+ :param xml: XML configuration, ``str``
+ :returns: Text of tag, ``str``
+ :throws: :class:`JenkinsException` whenever tag does not exist
+ or has invalidated text
+ '''
+ tag = ET.fromstring(xml).find(name)
+ try:
+ text = tag.text.strip()
+ if text:
+ return text
+ raise JenkinsException("tag[%s] is invalidated" % name)
+ except AttributeError:
+ raise JenkinsException("tag[%s] is invalidated" % name)
+
+ def assert_folder(self, name, exception_message='job[%s] is not a folder'):
+ '''Raise an exception if job is not Cloudbees Folder
+
+ :param name: Name of job, ``str``
+ :param exception_message: Message to use for the exception.
+ :throws: :class:`JenkinsException` whenever the job is
+ not Cloudbees Folder
+ '''
+ if not self.is_folder(name):
+ raise JenkinsException(exception_message % name)
+
+ def is_folder(self, name):
+ '''Check whether a job is Cloudbees Folder
+
+ :param name: Job name, ``str``
+ :returns: ``True`` if job is folder, ``False`` otherwise
+ '''
+ return 'com.cloudbees.hudson.plugins.folder.Folder' \
+ == self.get_job_info(name)['_class']
+
+ def assert_credential_exists(self, name, folder_name, domain_name='_',
+ exception_message='credential[%s] does not '
+ 'exist in the domain[%s] of [%s]'):
+ '''Raise an exception if credential does not exist in domain of folder
+
+ :param name: Name of credential, ``str``
+ :param folder_name: Folder name, ``str``
+ :param domain_name: Domain name, default is '_', ``str``
+ :param exception_message: Message to use for the exception.
+ Formatted with ``name``, ``domain_name``,
+ and ``folder_name``
+ :throws: :class:`JenkinsException` whenever the credentail
+ does not exist in domain of folder
+ '''
+ if not self.credential_exists(name, folder_name, domain_name):
+ raise JenkinsException(exception_message
+ % (name, domain_name, folder_name))
+
+ def credential_exists(self, name, folder_name, domain_name='_'):
+ '''Check whether a credentail exists in domain of folder
+
+ :param name: Name of credentail, ``str``
+ :param folder_name: Folder name, ``str``
+ :param domain_name: Domain name, default is '_', ``str``
+ :returns: ``True`` if credentail exists, ``False`` otherwise
+ '''
+ try:
+ return self.get_credential_info(name, folder_name,
+ domain_name)['id'] == name
+ except JenkinsException:
+ return False
+
+ def get_credential_info(self, name, folder_name, domain_name='_'):
+ '''Get credential information dictionary in domain of folder
+
+ :param name: Name of credentail, ``str``
+ :param folder_name: folder_name, ``str``
+ :param domain_name: Domain name, default is '_', ``str``
+ :returns: Dictionary of credential info, ``dict``
+ '''
+ self.assert_folder(folder_name)
+ folder_url, short_name = self._get_job_folder(folder_name)
+ try:
+ response = self.jenkins_open(requests.Request(
+ 'GET', self._build_url(CREDENTIAL_INFO, locals())
+ ))
+ if response:
+ return json.loads(response)
+ else:
+ raise JenkinsException('credential[%s] does not exist '
+ 'in the domain[%s] of [%s]'
+ % (name, domain_name, folder_name))
+ except (req_exc.HTTPError, NotFoundException):
+ raise JenkinsException('credential[%s] does not exist '
+ 'in the domain[%s] of [%s]'
+ % (name, domain_name, folder_name))
+ except ValueError:
+ raise JenkinsException(
+ 'Could not parse JSON info for credential[%s] '
+ 'in the domain[%s] of [%s]'
+ % (name, domain_name, folder_name)
+ )
+
+ def get_credential_config(self, name, folder_name, domain_name='_'):
+ '''Get configuration of credential in domain of folder.
+
+ :param name: Name of credentail, ``str``
+ :param folder_name: Folder name, ``str``
+ :param domain_name: Domain name, default is '_', ``str``
+ :returns: Credential configuration (XML format)
+ '''
+ self.assert_folder(folder_name)
+ folder_url, short_name = self._get_job_folder(folder_name)
+ return self.jenkins_open(requests.Request(
+ 'GET', self._build_url(CONFIG_CREDENTIAL, locals())
+ ))
+
+ def create_credential(self, folder_name, config_xml,
+ domain_name='_'):
+ '''Create credentail in domain of folder
+
+ :param folder_name: Folder name, ``str``
+ :param config_xml: New XML configuration, ``str``
+ :param domain_name: Domain name, default is '_', ``str``
+ '''
+ folder_url, short_name = self._get_job_folder(folder_name)
+ name = self._get_tag_text('id', config_xml)
+ if self.credential_exists(name, folder_name, domain_name):
+ raise JenkinsException('credential[%s] already exists '
+ 'in the domain[%s] of [%s]'
+ % (name, domain_name, folder_name))
+
+ self.jenkins_open(requests.Request(
+ 'POST', self._build_url(CREATE_CREDENTIAL, locals()),
+ data=config_xml.encode('utf-8'),
+ headers=DEFAULT_HEADERS
+ ))
+ self.assert_credential_exists(name, folder_name, domain_name,
+ 'create[%s] failed in the '
+ 'domain[%s] of [%s]')
+
+ def delete_credential(self, name, folder_name, domain_name='_'):
+ '''Delete credential from domain of folder
+
+ :param name: Name of credentail, ``str``
+ :param folder_name: Folder name, ``str``
+ :param domain_name: Domain name, default is '_', ``str``
+ '''
+ folder_url, short_name = self._get_job_folder(folder_name)
+ self.jenkins_open(requests.Request(
+ 'DELETE', self._build_url(CONFIG_CREDENTIAL, locals())
+ ))
+ if self.credential_exists(name, folder_name, domain_name):
+ raise JenkinsException('delete credential[%s] from '
+ 'domain[%s] of [%s] failed'
+ % (name, domain_name, folder_name))
+
+ def reconfig_credential(self, folder_name, config_xml, domain_name='_'):
+ '''Reconfig credential with new config in domain of folder
+
+ :param folder_name: Folder name, ``str``
+ :param config_xml: New XML configuration, ``str``
+ :param domain_name: Domain name, default is '_', ``str``
+ '''
+ folder_url, short_name = self._get_job_folder(folder_name)
+ name = self._get_tag_text('id', config_xml)
+ self.assert_credential_exists(name, folder_name, domain_name)
+ self.jenkins_open(requests.Request(
+ 'POST', self._build_url(CONFIG_CREDENTIAL, locals())
+ ))
+
+ def list_credentials(self, folder_name, domain_name='_'):
+ '''List credentials in domain of folder
+
+ :param folder_name: Folder name, ``str``
+ :param domain_name: Domain name, default is '_', ``str``
+ :returns: Credentials list, ``list``
+ '''
+ self.assert_folder(folder_name)
+ folder_url, short_name = self._get_job_folder(folder_name)
+ response = self.jenkins_open(requests.Request(
+ 'GET', self._build_url(LIST_CREDENTIALS, locals())
+ ))
+ return json.loads(response)['credentials']
+
def quiet_down(self):
'''Prepare Jenkins for shutdown.
diff --git a/tests/test_credential.py b/tests/test_credential.py
new file mode 100644
index 0000000..8e4e5f7
--- /dev/null
+++ b/tests/test_credential.py
@@ -0,0 +1,353 @@
+import json
+from mock import patch
+
+import jenkins
+from tests.base import JenkinsTestBase
+
+
+class JenkinsCredentialTestBase(JenkinsTestBase):
+ config_xml = """
+ GLOBAL
+ Test Credential
+ Test-User
+ secret123
+ """
+
+
+class JenkinsGetTagTextTest(JenkinsCredentialTestBase):
+
+ def test_simple(self):
+ name_to_return = self.j._get_tag_text('id', self.config_xml)
+ self.assertEqual('Test Credential', name_to_return)
+
+ def test_failed(self):
+ with self.assertRaises(jenkins.JenkinsException) as context_manager:
+ self.j._get_tag_text('id', '')
+ self.assertEqual(str(context_manager.exception),
+ 'tag[id] is invalidated')
+
+ with self.assertRaises(jenkins.JenkinsException) as context_manager:
+ self.j._get_tag_text('id', '')
+ self.assertEqual(str(context_manager.exception),
+ 'tag[id] is invalidated')
+
+ with self.assertRaises(jenkins.JenkinsException) as context_manager:
+ self.j._get_tag_text('id', ' ')
+ self.assertEqual(str(context_manager.exception),
+ 'tag[id] is invalidated')
+
+
+class JenkinsIsFolderTest(JenkinsCredentialTestBase):
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_is_folder(self, jenkins_mock):
+ jenkins_mock.side_effect = [
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ ]
+ self.assertTrue(self.j.is_folder('Test Folder'))
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_is_not_folder(self, jenkins_mock):
+ jenkins_mock.side_effect = [
+ json.dumps({'_class': 'org.jenkinsci.plugins.workflow.job.WorkflowJob'}),
+ ]
+ self.assertFalse(self.j.is_folder('Test Job'))
+
+
+class JenkinsAssertFolderTest(JenkinsCredentialTestBase):
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_is_folder(self, jenkins_mock):
+ jenkins_mock.side_effect = [
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ ]
+ self.j.assert_folder('Test Folder')
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_is_not_folder(self, jenkins_mock):
+ jenkins_mock.side_effect = [
+ json.dumps({'_class': 'org.jenkinsci.plugins.workflow.job.WorkflowJob'}),
+ ]
+ with self.assertRaises(jenkins.JenkinsException) as context_manager:
+ self.j.assert_folder('Test Job')
+ self.assertEqual(str(context_manager.exception),
+ 'job[Test Job] is not a folder')
+
+
+class JenkinsAssertCredentialTest(JenkinsCredentialTestBase):
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_credential_missing(self, jenkins_mock):
+ jenkins_mock.side_effect = [
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ jenkins.NotFoundException()
+ ]
+
+ with self.assertRaises(jenkins.JenkinsException) as context_manager:
+ self.j.assert_credential_exists('NonExistent', 'TestFoler')
+ self.assertEqual(
+ str(context_manager.exception),
+ 'credential[NonExistent] does not exist'
+ ' in the domain[_] of [TestFoler]')
+ self._check_requests(jenkins_mock.call_args_list)
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_credential_exists(self, jenkins_mock):
+ jenkins_mock.side_effect = [
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ json.dumps({'id': 'ExistingCredential'})
+ ]
+ self.j.assert_credential_exists('ExistingCredential', 'TestFoler')
+ self._check_requests(jenkins_mock.call_args_list)
+
+
+class JenkinsCredentialExistsTest(JenkinsCredentialTestBase):
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_credential_missing(self, jenkins_mock):
+ jenkins_mock.side_effect = [
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ jenkins.NotFoundException()
+ ]
+
+ self.assertEqual(self.j.credential_exists('NonExistent', 'TestFolder'),
+ False)
+ self._check_requests(jenkins_mock.call_args_list)
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_credential_exists(self, jenkins_mock):
+ jenkins_mock.side_effect = [
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ json.dumps({'id': 'ExistingCredential'})
+ ]
+
+ self.assertEqual(self.j.credential_exists('ExistingCredential',
+ 'TestFolder'),
+ True)
+ self._check_requests(jenkins_mock.call_args_list)
+
+
+class JenkinsGetCredentialInfoTest(JenkinsCredentialTestBase):
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_simple(self, jenkins_mock):
+ credential_info_to_return = {'id': 'ExistingCredential'}
+ jenkins_mock.side_effect = [
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ json.dumps(credential_info_to_return)
+ ]
+
+ credential_info = self.j.get_credential_info('ExistingCredential', 'TestFolder')
+
+ self.assertEqual(credential_info, credential_info_to_return)
+ self.assertEqual(
+ jenkins_mock.call_args[0][0].url,
+ self.make_url('job/TestFolder/credentials/store/folder/'
+ 'domain/_/credential/ExistingCredential/api/json?depth=0'))
+ self._check_requests(jenkins_mock.call_args_list)
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_nonexistent(self, jenkins_mock):
+ jenkins_mock.side_effect = [
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ None,
+ ]
+
+ with self.assertRaises(jenkins.JenkinsException) as context_manager:
+ self.j.get_credential_info('NonExistent', 'TestFolder')
+
+ self.assertEqual(
+ str(context_manager.exception),
+ 'credential[NonExistent] does not exist '
+ 'in the domain[_] of [TestFolder]')
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_invalid_json(self, jenkins_mock):
+ jenkins_mock.side_effect = [
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ '{invalid_json}'
+ ]
+
+ with self.assertRaises(jenkins.JenkinsException) as context_manager:
+ self.j.get_credential_info('NonExistent', 'TestFolder')
+
+ self.assertEqual(
+ str(context_manager.exception),
+ 'Could not parse JSON info for credential[NonExistent]'
+ ' in the domain[_] of [TestFolder]')
+
+
+class JenkinsGetCredentialConfigTest(JenkinsCredentialTestBase):
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_encodes_credential_name(self, jenkins_mock):
+ jenkins_mock.side_effect = [
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ None,
+ ]
+ self.j.get_credential_config(u'Test Credential', u'Test Folder')
+
+ self.assertEqual(
+ jenkins_mock.call_args_list[1][0][0].url,
+ self.make_url('job/Test%20Folder/credentials/store/folder/domain/'
+ '_/credential/Test%20Credential/config.xml'))
+ self._check_requests(jenkins_mock.call_args_list)
+
+
+class JenkinsCreateCredentialTest(JenkinsCredentialTestBase):
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_simple(self, jenkins_mock):
+ jenkins_mock.side_effect = [
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ jenkins.NotFoundException(),
+ None,
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ json.dumps({'id': 'Test Credential'}),
+ ]
+
+ self.j.create_credential('Test Folder', self.config_xml)
+ self.assertEqual(
+ jenkins_mock.call_args_list[1][0][0].url,
+ self.make_url('job/Test%20Folder/credentials/store/folder/'
+ 'domain/_/credential/Test%20Credential/api/json?depth=0'))
+
+ self.assertEqual(
+ jenkins_mock.call_args_list[2][0][0].url,
+ self.make_url('job/Test%20Folder/credentials/store/folder/'
+ 'domain/_/createCredentials'))
+
+ self.assertEqual(
+ jenkins_mock.call_args_list[4][0][0].url,
+ self.make_url('job/Test%20Folder/credentials/store/folder/'
+ 'domain/_/credential/Test%20Credential/api/json?depth=0'))
+ self._check_requests(jenkins_mock.call_args_list)
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_already_exists(self, jenkins_mock):
+ jenkins_mock.side_effect = [
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ json.dumps({'id': 'Test Credential'}),
+ ]
+
+ with self.assertRaises(jenkins.JenkinsException) as context_manager:
+ self.j.create_credential('Test Folder', self.config_xml)
+ self.assertEqual(
+ jenkins_mock.call_args_list[1][0][0].url,
+ self.make_url('job/Test%20Folder/credentials/store/folder/'
+ 'domain/_/credential/Test%20Credential/api/json?depth=0'))
+
+ self.assertEqual(
+ str(context_manager.exception),
+ 'credential[Test Credential] already exists'
+ ' in the domain[_] of [Test Folder]')
+ self._check_requests(jenkins_mock.call_args_list)
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_failed(self, jenkins_mock):
+ jenkins_mock.side_effect = [
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ jenkins.NotFoundException(),
+ None,
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ None,
+ ]
+
+ with self.assertRaises(jenkins.JenkinsException) as context_manager:
+ self.j.create_credential('Test Folder', self.config_xml)
+ self.assertEqual(
+ jenkins_mock.call_args_list[1][0][0].url,
+ self.make_url('job/Test%20Folder/credentials/store/folder/'
+ 'domain/_/credential/Test%20Credential/api/json?depth=0'))
+ self.assertEqual(
+ jenkins_mock.call_args_list[2][0][0].url,
+ self.make_url('job/Test%20Folder/credentials/store/'
+ 'folder/domain/_/createCredentials'))
+ self.assertEqual(
+ jenkins_mock.call_args_list[4][0][0].url,
+ self.make_url('job/Test%20Folder/credentials/store/folder/'
+ 'domain/_/credential/Test%20Credential/api/json?depth=0'))
+ self.assertEqual(
+ str(context_manager.exception),
+ 'create[Test Credential] failed in the domain[_] of [Test Folder]')
+ self._check_requests(jenkins_mock.call_args_list)
+
+
+class JenkinsDeleteCredentialTest(JenkinsCredentialTestBase):
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_simple(self, jenkins_mock):
+ jenkins_mock.side_effect = [
+ True,
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ jenkins.NotFoundException(),
+ ]
+
+ self.j.delete_credential(u'Test Credential', 'TestFolder')
+
+ self.assertEqual(
+ jenkins_mock.call_args_list[0][0][0].url,
+ self.make_url('job/TestFolder/credentials/store/folder/domain/'
+ '_/credential/Test%20Credential/config.xml'))
+ self._check_requests(jenkins_mock.call_args_list)
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_failed(self, jenkins_mock):
+ jenkins_mock.side_effect = [
+ json.dumps({'id': 'ExistingCredential'}),
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ json.dumps({'id': 'ExistingCredential'})
+ ]
+
+ with self.assertRaises(jenkins.JenkinsException) as context_manager:
+ self.j.delete_credential(u'ExistingCredential', 'TestFolder')
+ self.assertEqual(
+ jenkins_mock.call_args_list[0][0][0].url,
+ self.make_url('job/TestFolder/credentials/store/folder/'
+ 'domain/_/credential/ExistingCredential/config.xml'))
+ self.assertEqual(
+ str(context_manager.exception),
+ 'delete credential[ExistingCredential] from '
+ 'domain[_] of [TestFolder] failed')
+ self._check_requests(jenkins_mock.call_args_list)
+
+
+class JenkinsReconfigCredentialTest(JenkinsCredentialTestBase):
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_simple(self, jenkins_mock):
+ jenkins_mock.side_effect = [
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ json.dumps({'id': 'Test Credential'}),
+ None
+ ]
+
+ self.j.reconfig_credential(u'Test Folder', self.config_xml)
+
+ self.assertEqual(
+ jenkins_mock.call_args_list[1][0][0].url,
+ self.make_url('job/Test%20Folder/credentials/store/folder/domain/'
+ '_/credential/Test%20Credential/api/json?depth=0'))
+ self.assertEqual(
+ jenkins_mock.call_args_list[2][0][0].url,
+ self.make_url('job/Test%20Folder/credentials/store/folder/domain/'
+ '_/credential/Test%20Credential/config.xml'))
+ self._check_requests(jenkins_mock.call_args_list)
+
+
+class JenkinsListCredentialConfigTest(JenkinsCredentialTestBase):
+
+ @patch.object(jenkins.Jenkins, 'jenkins_open')
+ def test_simple(self, jenkins_mock):
+ credentials_to_return = [{'id': 'Test Credential'}]
+ jenkins_mock.side_effect = [
+ json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
+ json.dumps({'credentials': [{'id': 'Test Credential'}]}),
+ ]
+ credentials = self.j.list_credentials(u'Test Folder')
+ self.assertEqual(credentials, credentials_to_return)
+ self.assertEqual(
+ jenkins_mock.call_args_list[1][0][0].url,
+ self.make_url('job/Test%20Folder/credentials/store/folder/domain/'
+ '_/api/json?tree=credentials[id]'))
+ self._check_requests(jenkins_mock.call_args_list)