compute: Add Server.clear_password action

Change-Id: I5960605944fef2a300d6a3a9ff723a701e32cb64
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane 2024-05-09 12:51:00 +01:00
parent 916f9af658
commit 67cd66688d
6 changed files with 110 additions and 4 deletions

View File

@ -851,6 +851,17 @@ class Proxy(proxy.Proxy):
server = self._get_resource(_server.Server, server)
return server.get_password(self)
def clear_server_password(self, server):
"""Clear the administrator password
:param server: Either the ID of a server or a
:class:`~openstack.compute.v2.server.Server` instance.
:returns: None
"""
server = self._get_resource(_server.Server, server)
server.clear_password(self)
def reset_server_state(self, server, state):
"""Reset the state of server

View File

@ -323,7 +323,7 @@ class Server(resource.Resource, metadata.MetadataMixin, tag.TagMixin):
exceptions.raise_from_response(response)
return response
def change_password(self, session, password):
def change_password(self, session, password, *, microversion=None):
"""Change the administrator password to the given password.
:param session: The session to use for making this request.
@ -331,22 +331,40 @@ class Server(resource.Resource, metadata.MetadataMixin, tag.TagMixin):
:returns: None
"""
body = {'changePassword': {'adminPass': password}}
self._action(session, body)
self._action(session, body, microversion=microversion)
def get_password(self, session):
def get_password(self, session, *, microversion=None):
"""Get the encrypted administrator password.
:param session: The session to use for making this request.
:returns: The encrypted administrator password.
"""
url = utils.urljoin(Server.base_path, self.id, 'os-server-password')
if microversion is None:
microversion = self._get_microversion(session, action='commit')
response = session.get(url)
response = session.get(url, microversion=microversion)
exceptions.raise_from_response(response)
data = response.json()
return data.get('password')
def clear_password(self, session, *, microversion=None):
"""Clear the administrator password.
This removes the password from the database. It does not actually
change the server password.
:param session: The session to use for making this request.
:returns: None
"""
url = utils.urljoin(Server.base_path, self.id, 'os-server-password')
if microversion is None:
microversion = self._get_microversion(session, action='commit')
response = session.delete(url, microversion=microversion)
exceptions.raise_from_response(response)
def reboot(self, session, reboot_type):
"""Reboot server where reboot_type might be 'SOFT' or 'HARD'.

View File

@ -961,6 +961,30 @@ class TestCompute(TestComputeProxy):
def test_server_update(self):
self.verify_update(self.proxy.update_server, server.Server)
def test_server_change_password(self):
self._verify(
"openstack.compute.v2.server.Server.change_password",
self.proxy.change_server_password,
method_args=["value", "password"],
expected_args=[self.proxy, "password"],
)
def test_server_get_password(self):
self._verify(
"openstack.compute.v2.server.Server.get_password",
self.proxy.get_server_password,
method_args=["value"],
expected_args=[self.proxy],
)
def test_server_clear_password(self):
self._verify(
"openstack.compute.v2.server.Server.clear_password",
self.proxy.clear_server_password,
method_args=["value"],
expected_args=[self.proxy],
)
def test_server_wait_for(self):
value = server.Server(id='1234')
self.verify_wait_for_status(

View File

@ -10,12 +10,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import http
from unittest import mock
from openstack.compute.v2 import flavor
from openstack.compute.v2 import server
from openstack.image.v2 import image
from openstack.tests.unit import base
from openstack.tests.unit import fakes
IDENTIFIER = 'IDENTIFIER'
EXAMPLE = {
@ -317,6 +319,33 @@ class TestServer(base.TestCase):
microversion=self.sess.default_microversion,
)
def test_get_password(self):
sot = server.Server(**EXAMPLE)
self.sess.get.return_value = fakes.FakeResponse(
data={'password': 'foo'}
)
result = sot.get_password(self.sess)
self.assertEqual('foo', result)
url = 'servers/IDENTIFIER/os-server-password'
self.sess.get.assert_called_with(
url, microversion=self.sess.default_microversion
)
def test_clear_password(self):
sot = server.Server(**EXAMPLE)
self.sess.delete.return_value = fakes.FakeResponse(
status_code=http.HTTPStatus.NO_CONTENT,
)
self.assertIsNone(sot.clear_password(self.sess))
url = 'servers/IDENTIFIER/os-server-password'
self.sess.delete.assert_called_with(
url, microversion=self.sess.default_microversion
)
def test_reboot(self):
sot = server.Server(**EXAMPLE)

View File

@ -13,8 +13,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
from unittest import mock
import requests
class FakeTransport(mock.Mock):
RESPONSE = mock.Mock('200 OK')
@ -35,3 +38,19 @@ class FakeAuthenticator(mock.Mock):
self.get_token.return_value = self.TOKEN
self.get_endpoint = mock.Mock()
self.get_endpoint.return_value = self.ENDPOINT
class FakeResponse(requests.Response):
def __init__(
self, headers=None, status_code=200, data=None, encoding=None
):
super().__init__()
headers = headers or {}
self.status_code = status_code
self.headers.update(headers)
self._content = json.dumps(data)
if not isinstance(self._content, bytes):
self._content = self._content.encode()

View File

@ -0,0 +1,5 @@
---
features:
- |
The ``Server.clear_password`` and equivalent ``clear_server_password``
proxy method have been added.