Add a v2 API endpoint to reset the state of different scopes

A new endpoint is available to admin users on ``PUT /v2/scope``
with relatively similar parameters that are to be found on the
``GET /v2/scope`` endpoint regarding filtering. This allows
end users to reset the scope state of several scopes at once
if they are willing to.

Story: 2005395
Task: 30790
Change-Id: I28ccd24c65163b3e1b59e478653b01b84f2bb1b0
This commit is contained in:
Justin Ferrieu 2019-05-31 09:37:33 +02:00
parent b88c937891
commit d78ba8740e
12 changed files with 328 additions and 13 deletions

View File

@ -19,10 +19,15 @@ from werkzeug import exceptions as http_exceptions
from cloudkitty.api.v2 import base
from cloudkitty.api.v2 import utils as api_utils
from cloudkitty.common import policy
from cloudkitty import messaging
from cloudkitty import storage_state
from cloudkitty import utils as ck_utils
class ScopeState(base.BaseResource):
def __init__(self, *args, **kwargs):
super(ScopeState, self).__init__(*args, **kwargs)
self._client = messaging.get_client()
@api_utils.paginated
@api_utils.add_input_schema('query', {
@ -75,3 +80,59 @@ class ScopeState(base.BaseResource):
'state': str(r.state),
} for r in results]
}
@api_utils.add_input_schema('body', {
voluptuous.Exclusive('all_scopes', 'scope_selector'):
voluptuous.Boolean(),
voluptuous.Exclusive('scope_id', 'scope_selector'):
api_utils.MultiQueryParam(str),
voluptuous.Optional('scope_key', default=[]):
api_utils.MultiQueryParam(str),
voluptuous.Optional('fetcher', default=[]):
api_utils.MultiQueryParam(str),
voluptuous.Optional('collector', default=[]):
api_utils.MultiQueryParam(str),
voluptuous.Required('state'):
voluptuous.Coerce(ck_utils.iso2dt),
})
def put(self,
all_scopes=False,
scope_id=None,
scope_key=None,
fetcher=None,
collector=None,
state=None):
policy.authorize(
flask.request.context,
'scope:reset_state',
{'tenant_id': scope_id or flask.request.context.project_id}
)
if not all_scopes and scope_id is None:
raise http_exceptions.BadRequest(
"Either all_scopes or a scope_id should be specified.")
results = storage_state.StateManager().get_all(
identifier=scope_id,
scope_key=scope_key,
fetcher=fetcher,
collector=collector,
)
if len(results) < 1:
raise http_exceptions.NotFound(
"No resource found for provided filters.")
serialized_results = [{
'scope_id': r.identifier,
'scope_key': r.scope_key,
'fetcher': r.fetcher,
'collector': r.collector,
} for r in results]
self._client.cast({}, 'reset_state', res_data={
'scopes': serialized_results, 'state': ck_utils.dt2iso(state),
})
return {}, 202

View File

@ -24,6 +24,12 @@ scope_policies = [
description='Get the state of one or several scopes',
operations=[{'path': '/v2/scope',
'method': 'GET'}]),
policy.DocumentedRuleDefault(
name='scope:reset_state',
check_str=base.ROLE_ADMIN,
description='Reset the state of one or several scopes',
operations=[{'path': '/v2/scope',
'method': 'PUT'}]),
]

View File

@ -71,9 +71,19 @@ COLLECTORS_NAMESPACE = 'cloudkitty.collector.backends'
STORAGES_NAMESPACE = 'cloudkitty.storage.backends'
def get_lock(coord, tenant_id):
name = hashlib.sha256(
("cloudkitty-"
+ str(tenant_id + '-')
+ str(CONF.collect.collector + '-')
+ str(CONF.fetcher.backend + '-')
+ str(CONF.collect.scope_key)).encode('ascii')).hexdigest()
return name, coord.get_lock(name.encode('ascii'))
class RatingEndpoint(object):
target = oslo_messaging.Target(namespace='rating',
version='1.1')
version='1.0')
def __init__(self, orchestrator):
self._global_reload = False
@ -128,6 +138,56 @@ class RatingEndpoint(object):
self._pending_reload.remove(name)
class ScopeEndpoint(object):
target = oslo_messaging.Target(version='1.0')
def __init__(self):
self._coord = coordination.get_coordinator(
CONF.orchestrator.coordination_url,
uuidutils.generate_uuid().encode('ascii'))
self._state = state.StateManager()
self._storage = storage.get_storage()
self._coord.start(start_heart=True)
def reset_state(self, ctxt, res_data):
LOG.info('Received state reset command. {}'.format(res_data))
random.shuffle(res_data['scopes'])
for scope in res_data['scopes']:
lock_name, lock = get_lock(self._coord, scope['scope_id'])
LOG.debug(
'[ScopeEndpoint] Trying to acquire lock "{}" ...'.format(
lock_name,
)
)
if lock.acquire(blocking=True):
LOG.debug(
'[ScopeEndpoint] Acquired lock "{}".'.format(
lock_name,
)
)
state_dt = ck_utils.iso2dt(res_data['state'])
try:
self._storage.delete(begin=state_dt, end=None, filters={
scope['scope_key']: scope['scope_id'],
'collector': scope['collector'],
'fetcher': scope['fetcher'],
})
self._state.set_state(
scope['scope_id'],
state_dt,
fetcher=scope['fetcher'],
collector=scope['collector'],
scope_key=scope['scope_key'],
)
finally:
lock.release()
LOG.debug(
'[ScopeEndpoint] Released lock "{}" .'.format(
lock_name,
)
)
class BaseWorker(object):
def __init__(self, tenant_id=None):
self._tenant_id = tenant_id
@ -274,6 +334,7 @@ class Orchestrator(cotyledon.Service):
# RPC
self.server = None
self._rating_endpoint = RatingEndpoint(self)
self._scope_endpoint = ScopeEndpoint()
self._init_messaging()
# DLM
@ -282,21 +343,13 @@ class Orchestrator(cotyledon.Service):
uuidutils.generate_uuid().encode('ascii'))
self.coord.start(start_heart=True)
def _lock(self, tenant_id):
name = hashlib.sha256(
("cloudkitty-"
+ str(tenant_id + '-')
+ str(CONF.collect.collector + '-')
+ str(CONF.fetcher.backend + '-')
+ str(CONF.collect.scope_key)).encode('ascii')).hexdigest()
return name, self.coord.get_lock(name)
def _init_messaging(self):
target = oslo_messaging.Target(topic='cloudkitty',
server=CONF.host,
version='1.0')
endpoints = [
self._rating_endpoint,
self._scope_endpoint,
]
self.server = messaging.get_server(target, endpoints)
self.server.start()
@ -324,7 +377,7 @@ class Orchestrator(cotyledon.Service):
for tenant_id in self.tenants:
lock_name, lock = self._lock(tenant_id)
lock_name, lock = get_lock(self.coord, tenant_id)
LOG.debug(
'[Worker: {w}] Trying to acquire lock "{l}" ...'.format(
w=self._worker_id, l=lock_name)

View File

@ -78,7 +78,6 @@ class StateManager(object):
r = q.all()
session.close()
return r
def _get_db_item(self, session, identifier,
@ -124,6 +123,7 @@ class StateManager(object):
session.begin()
r = self._get_db_item(
session, identifier, fetcher, collector, scope_key)
if r and r.state != state:
r.state = state
session.commit()
@ -137,6 +137,7 @@ class StateManager(object):
)
session.add(state_object)
session.commit()
session.close()
def get_state(self, identifier,

View File

@ -271,6 +271,16 @@ class BaseFakeRPC(fixture.GabbiFixture):
self.server.stop()
class ScopeStateResetFakeRPC(BaseFakeRPC):
class FakeRPCEndpoint(object):
target = oslo_messaging.Target(version='1.0')
def reset_state(self, ctxt, res_data):
pass
endpoint = FakeRPCEndpoint
class QuoteFakeRPC(BaseFakeRPC):
class FakeRPCEndpoint(object):
target = oslo_messaging.Target(namespace='rating',

View File

@ -111,3 +111,61 @@ tests:
status: 404
query_parameters:
scope_key: nope
- name: Reset states of all scopes
url: /v2/scope
method: PUT
status: 202
request_headers:
content-type: application/json
data:
state: 20190716T085501Z
all_scopes: true
- name: Reset one scope state
url: /v2/scope
method: PUT
status: 202
request_headers:
content-type: application/json
data:
state: 20190716T085501Z
scope_id: aaaa
- name: Reset several scope states
url: /v2/scope
method: PUT
status: 202
request_headers:
content-type: application/json
data:
state: 20190716T085501Z
scope_id: aaaa
scope_id: bbbb
- name: Reset state with no scope_id or all_scopes
url: /v2/scope
method: PUT
status: 400
request_headers:
content-type: application/json
data:
scope_key: key1
state: 20190716T085501Z
response_strings:
- "Either all_scopes or a scope_id should be specified."
- name: Reset state with no params
url: /v2/scope
method: PUT
status: 400
- name: Reset state with no results for parameters
url: /v2/scope
method: PUT
status: 404
request_headers:
content-type: application/json
data:
state: 20190716T085501Z
scope_id: foobar

View File

@ -15,13 +15,18 @@
#
# @author: Stéphane Albert
#
import datetime
import mock
from oslo_messaging import conffixture
from stevedore import extension
from cloudkitty import collector
from cloudkitty import orchestrator
from cloudkitty.storage.v2 import influx
from cloudkitty import storage_state
from cloudkitty import tests
from tooz import coordination
from tooz.drivers import file
class FakeKeystoneClient(object):
@ -34,6 +39,77 @@ class FakeKeystoneClient(object):
tenants = FakeTenants()
class ScopeEndpointTest(tests.TestCase):
def setUp(self):
super(ScopeEndpointTest, self).setUp()
messaging_conf = self.useFixture(conffixture.ConfFixture(self.conf))
messaging_conf.transport_url = 'fake:/'
self.conf.set_override('backend', 'influxdb', 'storage')
def test_reset_state(self):
coord_start_patch = mock.patch.object(
coordination.CoordinationDriverWithExecutor, 'start')
lock_acquire_patch = mock.patch.object(
file.FileLock, 'acquire', return_value=True)
storage_delete_patch = mock.patch.object(
influx.InfluxStorage, 'delete')
state_set_patch = mock.patch.object(
storage_state.StateManager, 'set_state')
with coord_start_patch, lock_acquire_patch, \
storage_delete_patch as sd, state_set_patch as ss:
endpoint = orchestrator.ScopeEndpoint()
endpoint.reset_state({}, {
'scopes': [
{
'scope_id': 'f266f30b11f246b589fd266f85eeec39',
'scope_key': 'project_id',
'collector': 'prometheus',
'fetcher': 'prometheus',
},
{
'scope_id': '4dfb25b0947c4f5481daf7b948c14187',
'scope_key': 'project_id',
'collector': 'gnocchi',
'fetcher': 'gnocchi',
},
],
'state': '20190716T085501Z',
})
sd.assert_has_calls([
mock.call(
begin=datetime.datetime(2019, 7, 16, 8, 55, 1),
end=None,
filters={
'project_id': 'f266f30b11f246b589fd266f85eeec39',
'collector': 'prometheus',
'fetcher': 'prometheus'}),
mock.call(
begin=datetime.datetime(2019, 7, 16, 8, 55, 1),
end=None,
filters={
'project_id': '4dfb25b0947c4f5481daf7b948c14187',
'collector': 'gnocchi',
'fetcher': 'gnocchi'})], any_order=True)
ss.assert_has_calls([
mock.call(
'f266f30b11f246b589fd266f85eeec39',
datetime.datetime(2019, 7, 16, 8, 55, 1),
scope_key='project_id',
collector='prometheus',
fetcher='prometheus'),
mock.call(
'4dfb25b0947c4f5481daf7b948c14187',
datetime.datetime(2019, 7, 16, 8, 55, 1),
scope_key='project_id',
collector='gnocchi',
fetcher='gnocchi')], any_order=True)
class OrchestratorTest(tests.TestCase):
def setUp(self):
super(OrchestratorTest, self).setUp()

View File

@ -88,6 +88,10 @@
# GET /v2/scope
#"scope:get_state": "role:admin"
# Reset the state of one or several scopes
# PUT /v2/scope
#"scope:reset_state": "role:admin"
# Get a rating summary
# GET /v2/summary
#"summary:get_summary": "rule:admin_or_owner"

View File

@ -4,6 +4,9 @@
201:
default: Resource was successfully created.
202:
default: Request has been accepted for asynchronous processing.
400:
default: Invalid request.

View File

@ -48,3 +48,34 @@ Response Example
.. literalinclude:: ./api_samples/scope/scope_get.json
:language: javascript
Reset the status of several scopes
==================================
Reset the status of several scopes.
.. rest_method:: PUT /v2/scope
.. rest_parameters:: scope/scope_parameters.yml
- state: state
- collector: collector
- fetcher: fetcher
- scope_id: scope_id
- scope_key: scope_key
- all_scopes: all_scopes
Status codes
------------
.. rest_status_code:: success http_status.yml
- 202
.. rest_status_code:: error http_status.yml
- 400
- 403
- 404
- 405

View File

@ -40,11 +40,17 @@ scope_key: &scope_key
type: string
required: false
all_scopes: &all_scopes
in: body
description: |
Confirmation whether all scopes must be reset
type: bool
state:
in: body
description: |
State of the scope.
type: string
type: iso8601 timestamp
required: true
fetcher_resp:

View File

@ -0,0 +1,6 @@
---
features:
- |
Added a v2 API endpoint allowing to reset the state of several scopes.
This endpoint is available via a ``PUT`` request on ``/v2/scope`` and
supports filters. Admin privileges are required to use this endpoint.