Add active status fields in the storage state table

This patch adds active status fields in the storage state table
(cloudkitty_storage_states). A boolean column called "active",
which indicates if the CloudKitty scope is active for billing, and
another one called "scope_activation_toggle_date" (timestamp field)
to store the latest timestamp when the scope moved between the
active/deactivated states. Then, during CloudKitty processing, we
check the "active" column. If the resource is not active, we ignore
it during the processing.

Moreover, we introduce an API to allow operators to set the "active" field.
The "scope_activation_toggle_date" will not be exposed for operators to
change it. It is updated automatically according to the changes in the "active"
field.

This patch adds a new HTTP method to "/v2/scope" endpoint. We then use
"patch" HTTP method to allow operators to patch a storage scope. The API
will require the scope_id, and then, it takes into account some of the fields
we allow operators to change, and "active" field is one of them.

Change-Id: Ia02c2eeb98021c60549cb8deab6f2e964e573f1e
Implements: https://review.opendev.org/c/openstack/cloudkitty-specs/+/770928/
This commit is contained in:
Rafael Weingärtner 2021-02-24 14:48:24 -03:00
parent 70114c829e
commit 2acaa5a4a3
13 changed files with 398 additions and 31 deletions

View File

@ -47,6 +47,8 @@ class ScopeState(base.BaseResource):
api_utils.MultiQueryParam(str),
voluptuous.Optional('collector', default=[]):
api_utils.MultiQueryParam(str),
voluptuous.Optional('active', default=[]):
api_utils.MultiQueryParam(int),
})
@api_utils.add_output_schema({'results': [{
voluptuous.Required('scope_id'): vutils.get_string_type(),
@ -57,14 +59,12 @@ class ScopeState(base.BaseResource):
'last_processed_timestamp'): vutils.get_string_type(),
# This "state" property should be removed in the next release.
voluptuous.Optional('state'): vutils.get_string_type(),
voluptuous.Required('active'): bool,
voluptuous.Optional('scope_activation_toggle_date'):
vutils.get_string_type(),
}]})
def get(self,
offset=0,
limit=100,
scope_id=None,
scope_key=None,
fetcher=None,
collector=None):
def get(self, offset=0, limit=100, scope_id=None, scope_key=None,
fetcher=None, collector=None, active=None):
policy.authorize(
flask.request.context,
@ -72,13 +72,9 @@ class ScopeState(base.BaseResource):
{'project_id': scope_id or flask.request.context.project_id}
)
results = self._storage_state.get_all(
identifier=scope_id,
scope_key=scope_key,
fetcher=fetcher,
collector=collector,
offset=offset,
limit=limit,
)
identifier=scope_id, scope_key=scope_key, fetcher=fetcher,
collector=collector, offset=offset, limit=limit, active=active)
if len(results) < 1:
raise http_exceptions.NotFound(
"No resource found for provided filters.")
@ -91,6 +87,10 @@ class ScopeState(base.BaseResource):
'state': r.last_processed_timestamp.isoformat(),
'last_processed_timestamp':
r.last_processed_timestamp.isoformat(),
'active': r.active,
'scope_activation_toggle_date':
r.scope_activation_toggle_date.isoformat() if
r.scope_activation_toggle_date else None
} for r in results]
}
@ -165,3 +165,67 @@ class ScopeState(base.BaseResource):
})
return {}, 202
@api_utils.add_input_schema('body', {
voluptuous.Required('scope_id'):
api_utils.SingleQueryParam(str),
voluptuous.Optional('scope_key'):
api_utils.SingleQueryParam(str),
voluptuous.Optional('fetcher'):
api_utils.SingleQueryParam(str),
voluptuous.Optional('collector'):
api_utils.SingleQueryParam(str),
voluptuous.Optional('active'):
api_utils.SingleQueryParam(bool),
})
@api_utils.add_output_schema({
voluptuous.Required('scope_id'): vutils.get_string_type(),
voluptuous.Required('scope_key'): vutils.get_string_type(),
voluptuous.Required('fetcher'): vutils.get_string_type(),
voluptuous.Required('collector'): vutils.get_string_type(),
voluptuous.Required('state'): vutils.get_string_type(),
voluptuous.Required('active'): bool,
voluptuous.Required('scope_activation_toggle_date'):
vutils.get_string_type()
})
def patch(self, scope_id, scope_key=None, fetcher=None,
collector=None, active=None):
policy.authorize(
flask.request.context,
'scope:patch_state',
{'tenant_id': scope_id or flask.request.context.project_id}
)
results = self._storage_state.get_all(identifier=scope_id)
if len(results) < 1:
raise http_exceptions.NotFound(
"No resource found for provided filters.")
if len(results) > 1:
LOG.debug("Too many resources found with the same scope_id [%s], "
"scopes found: [%s].", scope_id, results)
raise http_exceptions.NotFound("Too many resources found with "
"the same scope_id: %s." % scope_id)
scope_to_update = results[0]
LOG.debug("Executing update of storage scope: [%s].", scope_to_update)
self._storage_state.update_storage_scope(scope_to_update,
scope_key=scope_key,
fetcher=fetcher,
collector=collector,
active=active)
storage_scopes = self._storage_state.get_all(identifier=scope_id)
update_storage_scope = storage_scopes[0]
return {
'scope_id': update_storage_scope.identifier,
'scope_key': update_storage_scope.scope_key,
'fetcher': update_storage_scope.fetcher,
'collector': update_storage_scope.collector,
'state': update_storage_scope.state.isoformat(),
'active': update_storage_scope.active,
'scope_activation_toggle_date':
update_storage_scope.scope_activation_toggle_date.isoformat()
}

View File

@ -30,6 +30,13 @@ scope_policies = [
description='Reset the state of one or several scopes',
operations=[{'path': '/v2/scope',
'method': 'PUT'}]),
policy.DocumentedRuleDefault(
name='scope:patch_state',
check_str=base.ROLE_ADMIN,
description='Enables operators to patch a storage scope',
operations=[{'path': '/v2/scope',
'method': 'PATCH'}]),
]

View File

@ -309,18 +309,42 @@ class Worker(BaseWorker):
def run(self):
while True:
timestamp = self._check_state()
LOG.debug("Processing timestamp [%s] for storage scope [%s].",
timestamp, self._tenant_id)
if not timestamp:
break
if self._state.get_state(self._tenant_id):
if not self._state.is_storage_scope_active(self._tenant_id):
LOG.debug("Skipping processing for storage scope [%s] "
"because it is marked as inactive.",
self._tenant_id)
break
else:
LOG.debug("No need to check if [%s] is de-activated. "
"We have never processed it before.")
if not self._state.is_storage_scope_active(self._tenant_id):
LOG.debug("Skipping processing for storage scope [%s] because "
"it is marked as inactive.", self._tenant_id)
break
metrics = list(self._conf['metrics'].keys())
# Collection
usage_data = self._do_collection(metrics, timestamp)
LOG.debug("Usage data [%s] found for storage scope [%s] in "
"timestamp [%s].", usage_data, self._tenant_id,
timestamp)
start_time = timestamp
end_time = tzutils.add_delta(timestamp,
timedelta(seconds=self._period))
frame = dataframe.DataFrame(
start=timestamp,
end=tzutils.add_delta(timestamp,
timedelta(seconds=self._period)),
start=start_time,
end=end_time,
usage=usage_data,
)
# Rating
@ -328,6 +352,10 @@ class Worker(BaseWorker):
frame = processor.obj.process(frame)
# Writing
LOG.debug("Persisting processed frames [%s] for tenant [%s] and "
"time [start=%s,end=%s]", frame, self._tenant_id,
start_time, end_time)
self._storage.push([frame], self._tenant_id)
self._state.set_state(self._tenant_id, timestamp)
@ -400,6 +428,8 @@ class Orchestrator(cotyledon.Service):
w=self._worker_id, lck=lock_name)
)
state = self._check_state(tenant_id)
LOG.debug("Next timestamp [%s] found for processing for "
"storage scope [%s].", state, tenant_id)
if state:
worker = Worker(
self.collector,
@ -411,6 +441,8 @@ class Orchestrator(cotyledon.Service):
lock.release()
LOG.debug("Finished processing all storage scopes.")
# FIXME(sheeprine): We may cause a drift here
time.sleep(CONF.collect.period)

View File

@ -33,6 +33,20 @@ CONF.import_opt('collector', 'cloudkitty.collector', 'collect')
CONF.import_opt('scope_key', 'cloudkitty.collector', 'collect')
def to_list_if_needed(value):
if not isinstance(value, list):
value = [value]
return value
def apply_offset_and_limit(limit, offset, q):
if offset:
q = q.offset(offset)
if limit:
q = q.limit(limit)
return q
class StateManager(object):
"""Class allowing state management in CloudKitty"""
@ -43,7 +57,9 @@ class StateManager(object):
fetcher=None,
collector=None,
scope_key=None,
limit=100, offset=0):
active=1,
limit=100,
offset=0):
"""Returns the state of all scopes.
This function returns the state of all scopes with support for optional
@ -59,20 +75,33 @@ class StateManager(object):
:type fetcher: list
:param scope_key: optional scope_keys to filter on
:type scope_key: list
:param active: optional active to filter scopes by status
(active/deactivated)
:type active: int
:param limit: optional to restrict the projection
:type limit: int
:param offset: optional to shift the projection
:type offset: int
"""
session = db.get_session()
session.begin()
q = utils.model_query(self.model, session)
if identifier:
q = q.filter(self.model.identifier.in_(identifier))
q = q.filter(
self.model.identifier.in_(to_list_if_needed(identifier)))
if fetcher:
q = q.filter(self.model.fetcher.in_(fetcher))
q = q.filter(
self.model.fetcher.in_(to_list_if_needed(fetcher)))
if collector:
q = q.filter(self.model.collector.in_(collector))
q = q.filter(
self.model.collector.in_(to_list_if_needed(collector)))
if scope_key:
q = q.filter(self.model.scope_key.in_(scope_key))
q = q.offset(offset).limit(limit)
q = q.filter(
self.model.scope_key.in_(to_list_if_needed(scope_key)))
if active is not None and active != []:
q = q.filter(self.model.active.in_(to_list_if_needed(active)))
q = apply_offset_and_limit(limit, offset, q)
r = q.all()
session.close()
@ -80,7 +109,8 @@ class StateManager(object):
for item in r:
item.last_processed_timestamp = tzutils.utc_to_local(
item.last_processed_timestamp)
item.scope_activation_toggle_date = tzutils.utc_to_local(
item.scope_activation_toggle_date)
return r
def _get_db_item(self, session, identifier,
@ -211,3 +241,65 @@ class StateManager(object):
q = utils.model_query(self.model, session)
session.close()
return [tenant.identifier for tenant in q]
def update_storage_scope(self, storage_scope_to_update, scope_key=None,
fetcher=None, collector=None, active=None):
"""Update storage scope data.
:param storage_scope_to_update: The storage scope to update in the DB
:type storage_scope_to_update: object
:param fetcher: Fetcher associated to the scope
:type fetcher: str
:param collector: Collector associated to the scope
:type collector: str
:param scope_key: scope_key associated to the scope
:type scope_key: str
:param active: indicates if the storage scope is active for processing
:type active: bool
"""
session = db.get_session()
session.begin()
db_scope = self._get_db_item(session,
storage_scope_to_update.identifier,
storage_scope_to_update.fetcher,
storage_scope_to_update.collector,
storage_scope_to_update.scope_key)
if scope_key:
db_scope.scope_key = scope_key
if fetcher:
db_scope.fetcher = fetcher
if collector:
db_scope.collector = collector
if active is not None and active != db_scope.active:
db_scope.active = active
now = tzutils.localized_now()
db_scope.scope_activation_toggle_date = tzutils.local_to_utc(
now, naive=True)
session.commit()
session.close()
def is_storage_scope_active(self, identifier, fetcher=None,
collector=None, scope_key=None):
"""Checks if a storage scope is active
:param identifier: Identifier of the scope
:type identifier: str
:param fetcher: Fetcher associated to the scope
:type fetcher: str
:param collector: Collector associated to the scope
:type collector: str
:param scope_key: scope_key associated to the scope
:type scope_key: str
:rtype: datetime.datetime
"""
session = db.get_session()
session.begin()
r = self._get_db_item(
session, identifier, fetcher, collector, scope_key)
session.close()
return r.active

View File

@ -0,0 +1,50 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Update storage state constraint
Revision ID: 4d69395f
Revises: 750d3050cf71
Create Date: 2019-05-15 17:02:56.595274
"""
import importlib
import sqlalchemy
from alembic import op
# revision identifiers, used by Alembic.
revision = '4d69395f'
down_revision = '750d3050cf71'
def upgrade():
down_version_module = importlib.import_module(
"cloudkitty.storage_state.alembic.versions."
"750d3050_create_last_processed_timestamp_column")
for name, table in down_version_module.Base.metadata.tables.items():
if name == 'cloudkitty_storage_states':
with op.batch_alter_table(name,
copy_from=table,
recreate='always') as batch_op:
batch_op.alter_column('identifier')
batch_op.add_column(
sqlalchemy.Column('scope_activation_toggle_date',
sqlalchemy.DateTime, nullable=False,
server_default=sqlalchemy.sql.func.now())
)
batch_op.add_column(
sqlalchemy.Column('active', sqlalchemy.Boolean,
nullable=False, default=True))
break

View File

@ -21,6 +21,11 @@ Create Date: 2021-02-08 17:00:00.000
"""
from alembic import op
import sqlalchemy
from sqlalchemy.ext import declarative
from oslo_db.sqlalchemy import models
from cloudkitty.storage_state.alembic.versions import \
c50ed2c19204_update_storage_state_constraint as down_version_module
@ -30,6 +35,8 @@ down_revision = 'c50ed2c19204'
branch_labels = None
depends_on = None
Base = declarative.declarative_base()
def upgrade():
for name, table in down_version_module.Base.metadata.tables.items():
@ -41,3 +48,37 @@ def upgrade():
'state', new_column_name='last_processed_timestamp')
break
class IdentifierTableForThisDataBaseModelChangeSet(Base, models.ModelBase):
"""Represents the state of a given identifier."""
@declarative.declared_attr
def __table_args__(cls):
return (
sqlalchemy.schema.UniqueConstraint(
'identifier',
'scope_key',
'collector',
'fetcher',
name='uq_cloudkitty_storage_states_identifier'),
)
__tablename__ = 'cloudkitty_storage_states'
id = sqlalchemy.Column(sqlalchemy.Integer,
primary_key=True)
identifier = sqlalchemy.Column(sqlalchemy.String(256),
nullable=False,
unique=False)
scope_key = sqlalchemy.Column(sqlalchemy.String(40),
nullable=True,
unique=False)
fetcher = sqlalchemy.Column(sqlalchemy.String(40),
nullable=True,
unique=False)
collector = sqlalchemy.Column(sqlalchemy.String(40),
nullable=True,
unique=False)
last_processed_timestamp = sqlalchemy.Column(
sqlalchemy.DateTime, nullable=False)

View File

@ -20,10 +20,11 @@ Create Date: 2019-05-15 17:02:56.595274
"""
from alembic import op
from sqlalchemy.ext import declarative
from oslo_db.sqlalchemy import models
import sqlalchemy
from sqlalchemy.ext import declarative
# revision identifiers, used by Alembic.
revision = 'c50ed2c19204'

View File

@ -16,8 +16,6 @@
from oslo_db.sqlalchemy import models
import sqlalchemy
from sqlalchemy.ext import declarative
from sqlalchemy import schema
Base = declarative.declarative_base()
@ -28,7 +26,7 @@ class IdentifierState(Base, models.ModelBase):
@declarative.declared_attr
def __table_args__(cls):
return (
schema.UniqueConstraint(
sqlalchemy.schema.UniqueConstraint(
'identifier',
'scope_key',
'collector',
@ -54,3 +52,8 @@ class IdentifierState(Base, models.ModelBase):
unique=False)
last_processed_timestamp = sqlalchemy.Column(
sqlalchemy.DateTime, nullable=False)
scope_activation_toggle_date = sqlalchemy.Column(
'scope_activation_toggle_date', sqlalchemy.DateTime, nullable=False,
server_default=sqlalchemy.sql.func.now())
active = sqlalchemy.Column('active', sqlalchemy.Boolean, nullable=False,
default=True)

View File

@ -97,6 +97,10 @@
# PUT /v2/scope
#"scope:reset_state": "role:admin"
# Enables operators to patch a storage scope
# PATCH /v2/scope
#"scope:patch_state": "role:admin"
# Get a rating summary
# GET /v2/summary
#"summary:get_summary": "rule:admin_or_owner"

View File

@ -5,21 +5,24 @@
"fetcher": "keystone",
"scope_id": "7a7e5183264644a7a79530eb56e59941",
"scope_key": "project_id",
"last_processed_timestamp": "2019-05-09 10:00:00"
"last_processed_timestamp": "2019-05-09 10:00:00",
"active": true
},
{
"collector": "gnocchi",
"fetcher": "keystone",
"scope_id": "9084fadcbd46481788e0ad7405dcbf12",
"scope_key": "project_id",
"last_processed_timestamp": "2019-05-08 03:00:00"
"last_processed_timestamp": "2019-05-08 03:00:00",
"active": true
},
{
"collector": "gnocchi",
"fetcher": "keystone",
"scope_id": "1f41d183fca5490ebda5c63fbaca026a",
"scope_key": "project_id",
"last_processed_timestamp": "2019-05-06 22:00:00"
"last_processed_timestamp": "2019-05-06 22:00:00",
"active": true
}
]
}

View File

@ -43,6 +43,7 @@ Response
- last_processed_timestamp: last_processed_timestamp
- scope_id: scope_id_resp
- scope_key: scope_key_resp
- active: active_key_resp
Response Example
----------------
@ -81,3 +82,53 @@ Status codes
- 403
- 404
- 405
Patch a scope
================================
Patches/updates a scope.
.. rest_method:: PATCH /v2/scope
.. rest_parameters:: scope/scope_parameters.yml
- collector: collector
- fetcher: fetcher
- limit: limit
- offset: offset
- scope_id: scope_id
- scope_key: scope_key
- active: active_body
Status codes
------------
.. rest_status_code:: success http_status.yml
- 200
.. rest_status_code:: error http_status.yml
- 400
- 403
- 404
- 405
Response
--------
.. rest_parameters:: scope/scope_parameters.yml
- collector: collector_resp
- fetcher: fetcher_resp
- state: state
- scope_id: scope_id_resp
- scope_key: scope_key_resp
- active: active_key_resp
Response Example
----------------
.. literalinclude:: ./api_samples/scope/scope_get.json
:language: javascript

View File

@ -40,6 +40,21 @@ scope_key: &scope_key
type: string
required: false
active_anchor_query: &active_query
in: body
description: |
Defines if a scope should be processed or not; `True` means that
CloudKitty must process the scope.
type: bool
required: true
active_body:
<<: *active_query
required: false
active_key_resp:
<<: *active_query
all_scopes: &all_scopes
in: body
description: |

View File

@ -0,0 +1,4 @@
---
features:
- |
Add active status option in the storage state table and API.