Merge "Add folder credential support"

This commit is contained in:
Zuul 2018-09-21 13:46:39 +00:00 committed by Gerrit Code Review
commit ede93173e3
2 changed files with 545 additions and 0 deletions

View File

@ -62,6 +62,7 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning
from six.moves.http_client import BadStatusLine
from six.moves.urllib.error import URLError
from six.moves.urllib.parse import quote, urlencode, urljoin, urlparse
import xml.etree.ElementTree as ET
from jenkins import plugins
@ -141,6 +142,14 @@ PROMOTION_INFO = '%(folder_url)sjob/%(short_name)s/promotion/api/json?depth=%(de
DELETE_PROMOTION = '%(folder_url)sjob/%(short_name)s/promotion/process/%(name)s/doDelete'
CREATE_PROMOTION = '%(folder_url)sjob/%(short_name)s/promotion/createProcess?name=%(name)s'
CONFIG_PROMOTION = '%(folder_url)sjob/%(short_name)s/promotion/process/%(name)s/config.xml'
LIST_CREDENTIALS = '%(folder_url)sjob/%(short_name)s/credentials/store/folder/' \
'domain/%(domain_name)s/api/json?tree=credentials[id]'
CREATE_CREDENTIAL = '%(folder_url)sjob/%(short_name)s/credentials/store/folder/' \
'domain/%(domain_name)s/createCredentials'
CONFIG_CREDENTIAL = '%(folder_url)sjob/%(short_name)s/credentials/store/folder/' \
'domain/%(domain_name)s/credential/%(name)s/config.xml'
CREDENTIAL_INFO = '%(folder_url)sjob/%(short_name)s/credentials/store/folder/' \
'domain/%(domain_name)s/credential/%(name)s/api/json?depth=0'
QUIET_DOWN = 'quietDown'
# for testing only
@ -1932,6 +1941,189 @@ class Jenkins(object):
'GET', self._build_url(CONFIG_PROMOTION, locals()))
return self.jenkins_open(request)
def _get_tag_text(self, name, xml):
'''Get text of tag from xml
:param name: XML tag name, ``str``
:param xml: XML configuration, ``str``
:returns: Text of tag, ``str``
:throws: :class:`JenkinsException` whenever tag does not exist
or has invalidated text
'''
tag = ET.fromstring(xml).find(name)
try:
text = tag.text.strip()
if text:
return text
raise JenkinsException("tag[%s] is invalidated" % name)
except AttributeError:
raise JenkinsException("tag[%s] is invalidated" % name)
def assert_folder(self, name, exception_message='job[%s] is not a folder'):
'''Raise an exception if job is not Cloudbees Folder
:param name: Name of job, ``str``
:param exception_message: Message to use for the exception.
:throws: :class:`JenkinsException` whenever the job is
not Cloudbees Folder
'''
if not self.is_folder(name):
raise JenkinsException(exception_message % name)
def is_folder(self, name):
'''Check whether a job is Cloudbees Folder
:param name: Job name, ``str``
:returns: ``True`` if job is folder, ``False`` otherwise
'''
return 'com.cloudbees.hudson.plugins.folder.Folder' \
== self.get_job_info(name)['_class']
def assert_credential_exists(self, name, folder_name, domain_name='_',
exception_message='credential[%s] does not '
'exist in the domain[%s] of [%s]'):
'''Raise an exception if credential does not exist in domain of folder
:param name: Name of credential, ``str``
:param folder_name: Folder name, ``str``
:param domain_name: Domain name, default is '_', ``str``
:param exception_message: Message to use for the exception.
Formatted with ``name``, ``domain_name``,
and ``folder_name``
:throws: :class:`JenkinsException` whenever the credentail
does not exist in domain of folder
'''
if not self.credential_exists(name, folder_name, domain_name):
raise JenkinsException(exception_message
% (name, domain_name, folder_name))
def credential_exists(self, name, folder_name, domain_name='_'):
'''Check whether a credentail exists in domain of folder
:param name: Name of credentail, ``str``
:param folder_name: Folder name, ``str``
:param domain_name: Domain name, default is '_', ``str``
:returns: ``True`` if credentail exists, ``False`` otherwise
'''
try:
return self.get_credential_info(name, folder_name,
domain_name)['id'] == name
except JenkinsException:
return False
def get_credential_info(self, name, folder_name, domain_name='_'):
'''Get credential information dictionary in domain of folder
:param name: Name of credentail, ``str``
:param folder_name: folder_name, ``str``
:param domain_name: Domain name, default is '_', ``str``
:returns: Dictionary of credential info, ``dict``
'''
self.assert_folder(folder_name)
folder_url, short_name = self._get_job_folder(folder_name)
try:
response = self.jenkins_open(requests.Request(
'GET', self._build_url(CREDENTIAL_INFO, locals())
))
if response:
return json.loads(response)
else:
raise JenkinsException('credential[%s] does not exist '
'in the domain[%s] of [%s]'
% (name, domain_name, folder_name))
except (req_exc.HTTPError, NotFoundException):
raise JenkinsException('credential[%s] does not exist '
'in the domain[%s] of [%s]'
% (name, domain_name, folder_name))
except ValueError:
raise JenkinsException(
'Could not parse JSON info for credential[%s] '
'in the domain[%s] of [%s]'
% (name, domain_name, folder_name)
)
def get_credential_config(self, name, folder_name, domain_name='_'):
'''Get configuration of credential in domain of folder.
:param name: Name of credentail, ``str``
:param folder_name: Folder name, ``str``
:param domain_name: Domain name, default is '_', ``str``
:returns: Credential configuration (XML format)
'''
self.assert_folder(folder_name)
folder_url, short_name = self._get_job_folder(folder_name)
return self.jenkins_open(requests.Request(
'GET', self._build_url(CONFIG_CREDENTIAL, locals())
))
def create_credential(self, folder_name, config_xml,
domain_name='_'):
'''Create credentail in domain of folder
:param folder_name: Folder name, ``str``
:param config_xml: New XML configuration, ``str``
:param domain_name: Domain name, default is '_', ``str``
'''
folder_url, short_name = self._get_job_folder(folder_name)
name = self._get_tag_text('id', config_xml)
if self.credential_exists(name, folder_name, domain_name):
raise JenkinsException('credential[%s] already exists '
'in the domain[%s] of [%s]'
% (name, domain_name, folder_name))
self.jenkins_open(requests.Request(
'POST', self._build_url(CREATE_CREDENTIAL, locals()),
data=config_xml.encode('utf-8'),
headers=DEFAULT_HEADERS
))
self.assert_credential_exists(name, folder_name, domain_name,
'create[%s] failed in the '
'domain[%s] of [%s]')
def delete_credential(self, name, folder_name, domain_name='_'):
'''Delete credential from domain of folder
:param name: Name of credentail, ``str``
:param folder_name: Folder name, ``str``
:param domain_name: Domain name, default is '_', ``str``
'''
folder_url, short_name = self._get_job_folder(folder_name)
self.jenkins_open(requests.Request(
'DELETE', self._build_url(CONFIG_CREDENTIAL, locals())
))
if self.credential_exists(name, folder_name, domain_name):
raise JenkinsException('delete credential[%s] from '
'domain[%s] of [%s] failed'
% (name, domain_name, folder_name))
def reconfig_credential(self, folder_name, config_xml, domain_name='_'):
'''Reconfig credential with new config in domain of folder
:param folder_name: Folder name, ``str``
:param config_xml: New XML configuration, ``str``
:param domain_name: Domain name, default is '_', ``str``
'''
folder_url, short_name = self._get_job_folder(folder_name)
name = self._get_tag_text('id', config_xml)
self.assert_credential_exists(name, folder_name, domain_name)
self.jenkins_open(requests.Request(
'POST', self._build_url(CONFIG_CREDENTIAL, locals())
))
def list_credentials(self, folder_name, domain_name='_'):
'''List credentials in domain of folder
:param folder_name: Folder name, ``str``
:param domain_name: Domain name, default is '_', ``str``
:returns: Credentials list, ``list``
'''
self.assert_folder(folder_name)
folder_url, short_name = self._get_job_folder(folder_name)
response = self.jenkins_open(requests.Request(
'GET', self._build_url(LIST_CREDENTIALS, locals())
))
return json.loads(response)['credentials']
def quiet_down(self):
'''Prepare Jenkins for shutdown.

353
tests/test_credential.py Normal file
View File

@ -0,0 +1,353 @@
import json
from mock import patch
import jenkins
from tests.base import JenkinsTestBase
class JenkinsCredentialTestBase(JenkinsTestBase):
config_xml = """<com.cloudbees.plugins.credentials.impl.UsernamePasswordCredentialsImpl>
<scope>GLOBAL</scope>
<id>Test Credential</id>
<username>Test-User</username>
<password>secret123</password>
</com.cloudbees.plugins.credentials.impl.UsernamePasswordCredentialsImpl>"""
class JenkinsGetTagTextTest(JenkinsCredentialTestBase):
def test_simple(self):
name_to_return = self.j._get_tag_text('id', self.config_xml)
self.assertEqual('Test Credential', name_to_return)
def test_failed(self):
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j._get_tag_text('id', '<xml></xml>')
self.assertEqual(str(context_manager.exception),
'tag[id] is invalidated')
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j._get_tag_text('id', '<xml><id></id></xml>')
self.assertEqual(str(context_manager.exception),
'tag[id] is invalidated')
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j._get_tag_text('id', '<xml><id> </id></xml>')
self.assertEqual(str(context_manager.exception),
'tag[id] is invalidated')
class JenkinsIsFolderTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_is_folder(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
]
self.assertTrue(self.j.is_folder('Test Folder'))
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_is_not_folder(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'org.jenkinsci.plugins.workflow.job.WorkflowJob'}),
]
self.assertFalse(self.j.is_folder('Test Job'))
class JenkinsAssertFolderTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_is_folder(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
]
self.j.assert_folder('Test Folder')
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_is_not_folder(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'org.jenkinsci.plugins.workflow.job.WorkflowJob'}),
]
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j.assert_folder('Test Job')
self.assertEqual(str(context_manager.exception),
'job[Test Job] is not a folder')
class JenkinsAssertCredentialTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_credential_missing(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
jenkins.NotFoundException()
]
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j.assert_credential_exists('NonExistent', 'TestFoler')
self.assertEqual(
str(context_manager.exception),
'credential[NonExistent] does not exist'
' in the domain[_] of [TestFoler]')
self._check_requests(jenkins_mock.call_args_list)
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_credential_exists(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
json.dumps({'id': 'ExistingCredential'})
]
self.j.assert_credential_exists('ExistingCredential', 'TestFoler')
self._check_requests(jenkins_mock.call_args_list)
class JenkinsCredentialExistsTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_credential_missing(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
jenkins.NotFoundException()
]
self.assertEqual(self.j.credential_exists('NonExistent', 'TestFolder'),
False)
self._check_requests(jenkins_mock.call_args_list)
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_credential_exists(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
json.dumps({'id': 'ExistingCredential'})
]
self.assertEqual(self.j.credential_exists('ExistingCredential',
'TestFolder'),
True)
self._check_requests(jenkins_mock.call_args_list)
class JenkinsGetCredentialInfoTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_simple(self, jenkins_mock):
credential_info_to_return = {'id': 'ExistingCredential'}
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
json.dumps(credential_info_to_return)
]
credential_info = self.j.get_credential_info('ExistingCredential', 'TestFolder')
self.assertEqual(credential_info, credential_info_to_return)
self.assertEqual(
jenkins_mock.call_args[0][0].url,
self.make_url('job/TestFolder/credentials/store/folder/'
'domain/_/credential/ExistingCredential/api/json?depth=0'))
self._check_requests(jenkins_mock.call_args_list)
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_nonexistent(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
None,
]
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j.get_credential_info('NonExistent', 'TestFolder')
self.assertEqual(
str(context_manager.exception),
'credential[NonExistent] does not exist '
'in the domain[_] of [TestFolder]')
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_invalid_json(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
'{invalid_json}'
]
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j.get_credential_info('NonExistent', 'TestFolder')
self.assertEqual(
str(context_manager.exception),
'Could not parse JSON info for credential[NonExistent]'
' in the domain[_] of [TestFolder]')
class JenkinsGetCredentialConfigTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_encodes_credential_name(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
None,
]
self.j.get_credential_config(u'Test Credential', u'Test Folder')
self.assertEqual(
jenkins_mock.call_args_list[1][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/domain/'
'_/credential/Test%20Credential/config.xml'))
self._check_requests(jenkins_mock.call_args_list)
class JenkinsCreateCredentialTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_simple(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
jenkins.NotFoundException(),
None,
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
json.dumps({'id': 'Test Credential'}),
]
self.j.create_credential('Test Folder', self.config_xml)
self.assertEqual(
jenkins_mock.call_args_list[1][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/'
'domain/_/credential/Test%20Credential/api/json?depth=0'))
self.assertEqual(
jenkins_mock.call_args_list[2][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/'
'domain/_/createCredentials'))
self.assertEqual(
jenkins_mock.call_args_list[4][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/'
'domain/_/credential/Test%20Credential/api/json?depth=0'))
self._check_requests(jenkins_mock.call_args_list)
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_already_exists(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
json.dumps({'id': 'Test Credential'}),
]
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j.create_credential('Test Folder', self.config_xml)
self.assertEqual(
jenkins_mock.call_args_list[1][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/'
'domain/_/credential/Test%20Credential/api/json?depth=0'))
self.assertEqual(
str(context_manager.exception),
'credential[Test Credential] already exists'
' in the domain[_] of [Test Folder]')
self._check_requests(jenkins_mock.call_args_list)
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_failed(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
jenkins.NotFoundException(),
None,
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
None,
]
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j.create_credential('Test Folder', self.config_xml)
self.assertEqual(
jenkins_mock.call_args_list[1][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/'
'domain/_/credential/Test%20Credential/api/json?depth=0'))
self.assertEqual(
jenkins_mock.call_args_list[2][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/'
'folder/domain/_/createCredentials'))
self.assertEqual(
jenkins_mock.call_args_list[4][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/'
'domain/_/credential/Test%20Credential/api/json?depth=0'))
self.assertEqual(
str(context_manager.exception),
'create[Test Credential] failed in the domain[_] of [Test Folder]')
self._check_requests(jenkins_mock.call_args_list)
class JenkinsDeleteCredentialTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_simple(self, jenkins_mock):
jenkins_mock.side_effect = [
True,
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
jenkins.NotFoundException(),
]
self.j.delete_credential(u'Test Credential', 'TestFolder')
self.assertEqual(
jenkins_mock.call_args_list[0][0][0].url,
self.make_url('job/TestFolder/credentials/store/folder/domain/'
'_/credential/Test%20Credential/config.xml'))
self._check_requests(jenkins_mock.call_args_list)
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_failed(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'id': 'ExistingCredential'}),
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
json.dumps({'id': 'ExistingCredential'})
]
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j.delete_credential(u'ExistingCredential', 'TestFolder')
self.assertEqual(
jenkins_mock.call_args_list[0][0][0].url,
self.make_url('job/TestFolder/credentials/store/folder/'
'domain/_/credential/ExistingCredential/config.xml'))
self.assertEqual(
str(context_manager.exception),
'delete credential[ExistingCredential] from '
'domain[_] of [TestFolder] failed')
self._check_requests(jenkins_mock.call_args_list)
class JenkinsReconfigCredentialTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_simple(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
json.dumps({'id': 'Test Credential'}),
None
]
self.j.reconfig_credential(u'Test Folder', self.config_xml)
self.assertEqual(
jenkins_mock.call_args_list[1][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/domain/'
'_/credential/Test%20Credential/api/json?depth=0'))
self.assertEqual(
jenkins_mock.call_args_list[2][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/domain/'
'_/credential/Test%20Credential/config.xml'))
self._check_requests(jenkins_mock.call_args_list)
class JenkinsListCredentialConfigTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_simple(self, jenkins_mock):
credentials_to_return = [{'id': 'Test Credential'}]
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
json.dumps({'credentials': [{'id': 'Test Credential'}]}),
]
credentials = self.j.list_credentials(u'Test Folder')
self.assertEqual(credentials, credentials_to_return)
self.assertEqual(
jenkins_mock.call_args_list[1][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/domain/'
'_/api/json?tree=credentials[id]'))
self._check_requests(jenkins_mock.call_args_list)