diff --git a/cloudkitty/api/v2/scope/state.py b/cloudkitty/api/v2/scope/state.py index fe46bbae..0c2f13d0 100644 --- a/cloudkitty/api/v2/scope/state.py +++ b/cloudkitty/api/v2/scope/state.py @@ -24,6 +24,10 @@ from cloudkitty import storage_state from cloudkitty.utils import tz as tzutils from cloudkitty.utils import validation as vutils +from oslo_log import log + +LOG = log.getLogger(__name__) + class ScopeState(base.BaseResource): @@ -49,7 +53,10 @@ class ScopeState(base.BaseResource): voluptuous.Required('scope_key'): vutils.get_string_type(), voluptuous.Required('fetcher'): vutils.get_string_type(), voluptuous.Required('collector'): vutils.get_string_type(), - voluptuous.Required('state'): vutils.get_string_type(), + voluptuous.Optional( + 'last_processed_timestamp'): vutils.get_string_type(), + # This "state" property should be removed in the next release. + voluptuous.Optional('state'): vutils.get_string_type(), }]}) def get(self, offset=0, @@ -81,7 +88,9 @@ class ScopeState(base.BaseResource): 'scope_key': r.scope_key, 'fetcher': r.fetcher, 'collector': r.collector, - 'state': r.state.isoformat(), + 'state': r.last_processed_timestamp.isoformat(), + 'last_processed_timestamp': + r.last_processed_timestamp.isoformat(), } for r in results] } @@ -96,7 +105,10 @@ class ScopeState(base.BaseResource): api_utils.MultiQueryParam(str), voluptuous.Optional('collector', default=[]): api_utils.MultiQueryParam(str), - voluptuous.Required('state'): + voluptuous.Optional('last_processed_timestamp'): + voluptuous.Coerce(tzutils.dt_from_iso), + # This "state" property should be removed in the next release. + voluptuous.Optional('state'): voluptuous.Coerce(tzutils.dt_from_iso), }) def put(self, @@ -105,6 +117,7 @@ class ScopeState(base.BaseResource): scope_key=None, fetcher=None, collector=None, + last_processed_timestamp=None, state=None): policy.authorize( @@ -117,6 +130,15 @@ class ScopeState(base.BaseResource): raise http_exceptions.BadRequest( "Either all_scopes or a scope_id should be specified.") + if not state and not last_processed_timestamp: + raise http_exceptions.BadRequest( + "Variables 'state' and 'last_processed_timestamp' cannot be " + "empty/None. We expect at least one of them.") + if state: + LOG.warning("The use of 'state' variable is deprecated, and will " + "be removed in the next upcomming release. You should " + "consider using 'last_processed_timestamp' variable.") + results = self._storage_state.get_all( identifier=scope_id, scope_key=scope_key, @@ -135,8 +157,11 @@ class ScopeState(base.BaseResource): 'collector': r.collector, } for r in results] + if not last_processed_timestamp: + last_processed_timestamp = state self._client.cast({}, 'reset_state', res_data={ - 'scopes': serialized_results, 'state': state.isoformat(), + 'scopes': serialized_results, + 'last_processed_timestamp': last_processed_timestamp.isoformat() }) return {}, 202 diff --git a/cloudkitty/orchestrator.py b/cloudkitty/orchestrator.py index 0087b0ae..979fbc3a 100644 --- a/cloudkitty/orchestrator.py +++ b/cloudkitty/orchestrator.py @@ -174,14 +174,15 @@ class ScopeEndpoint(object): lock_name, ) ) - state_dt = tzutils.dt_from_iso(res_data['state']) + last_processed_timestamp = tzutils.dt_from_iso( + res_data['last_processed_timestamp']) try: - self._storage.delete(begin=state_dt, end=None, filters={ - scope['scope_key']: scope['scope_id'], - }) - self._state.set_state( + self._storage.delete( + begin=last_processed_timestamp, end=None, filters={ + scope['scope_key']: scope['scope_id']}) + self._state.set_last_processed_timestamp( scope['scope_id'], - state_dt, + last_processed_timestamp, fetcher=scope['fetcher'], collector=scope['collector'], scope_key=scope['scope_key'], diff --git a/cloudkitty/storage_state/__init__.py b/cloudkitty/storage_state/__init__.py index c5810c60..0722156a 100644 --- a/cloudkitty/storage_state/__init__.py +++ b/cloudkitty/storage_state/__init__.py @@ -78,7 +78,8 @@ class StateManager(object): session.close() for item in r: - item.state = tzutils.utc_to_local(item.state) + item.last_processed_timestamp = tzutils.utc_to_local( + item.last_processed_timestamp) return r @@ -119,12 +120,26 @@ class StateManager(object): def set_state(self, identifier, state, fetcher=None, collector=None, scope_key=None): - """Set the state of a scope. + """Set the last processed timestamp of a scope. + + This method is deprecated, consider using + "set_last_processed_timestamp". + """ + LOG.warning("The method 'set_state' is deprecated." + "Consider using the new method " + "'set_last_processed_timestamp'.") + self.set_last_processed_timestamp( + identifier, state, fetcher, collector, scope_key) + + def set_last_processed_timestamp( + self, identifier, last_processed_timestamp, fetcher=None, + collector=None, scope_key=None): + """Set the last processed timestamp of a scope. :param identifier: Identifier of the scope :type identifier: str - :param state: state of the scope - :type state: datetime.datetime + :param last_processed_timestamp: last processed timestamp of the scope + :type last_processed_timestamp: datetime.datetime :param fetcher: Fetcher associated to the scope :type fetcher: str :param collector: Collector associated to the scope @@ -132,20 +147,21 @@ class StateManager(object): :param scope_key: scope_key associated to the scope :type scope_key: str """ - state = tzutils.local_to_utc(state, naive=True) + last_processed_timestamp = tzutils.local_to_utc( + last_processed_timestamp, naive=True) session = db.get_session() session.begin() r = self._get_db_item( session, identifier, fetcher, collector, scope_key) if r: - if r.state != state: - r.state = state + if r.last_processed_timestamp != last_processed_timestamp: + r.last_processed_timestamp = last_processed_timestamp session.commit() else: state_object = self.model( identifier=identifier, - state=state, + last_processed_timestamp=last_processed_timestamp, fetcher=fetcher, collector=collector, scope_key=scope_key, @@ -157,7 +173,15 @@ class StateManager(object): def get_state(self, identifier, fetcher=None, collector=None, scope_key=None): - """Get the state of a scope. + LOG.warning("The method 'get_state' is deprecated." + "Consider using the new method" + "'get_last_processed_timestamp'.") + return self.get_last_processed_timestamp( + identifier, fetcher, collector, scope_key) + + def get_last_processed_timestamp(self, identifier, fetcher=None, + collector=None, scope_key=None): + """Get the last processed timestamp of a scope. :param identifier: Identifier of the scope :type identifier: str @@ -174,7 +198,7 @@ class StateManager(object): r = self._get_db_item( session, identifier, fetcher, collector, scope_key) session.close() - return tzutils.utc_to_local(r.state) if r else None + return tzutils.utc_to_local(r.last_processed_timestamp) if r else None def init(self): migration.upgrade('head') diff --git a/cloudkitty/storage_state/alembic/versions/750d3050_create_last_processed_timestamp_column.py b/cloudkitty/storage_state/alembic/versions/750d3050_create_last_processed_timestamp_column.py new file mode 100644 index 00000000..5b47025c --- /dev/null +++ b/cloudkitty/storage_state/alembic/versions/750d3050_create_last_processed_timestamp_column.py @@ -0,0 +1,43 @@ +# Copyright 2019 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Create last processed timestamp column + +Revision ID: 750d3050cf71 +Revises: d9d103dd4dcf +Create Date: 2021-02-08 17:00:00.000 + +""" +from alembic import op + +from cloudkitty.storage_state.alembic.versions import \ + c50ed2c19204_update_storage_state_constraint as down_version_module + +# revision identifiers, used by Alembic. +revision = '750d3050cf71' +down_revision = 'c50ed2c19204' +branch_labels = None +depends_on = None + + +def upgrade(): + for name, table in down_version_module.Base.metadata.tables.items(): + if name == 'cloudkitty_storage_states': + with op.batch_alter_table(name, + copy_from=table, + recreate='always') as batch_op: + batch_op.alter_column( + 'state', new_column_name='last_processed_timestamp') + + break diff --git a/cloudkitty/storage_state/alembic/versions/c50ed2c19204_update_storage_state_constraint.py b/cloudkitty/storage_state/alembic/versions/c50ed2c19204_update_storage_state_constraint.py index 47367eda..1a95be38 100644 --- a/cloudkitty/storage_state/alembic/versions/c50ed2c19204_update_storage_state_constraint.py +++ b/cloudkitty/storage_state/alembic/versions/c50ed2c19204_update_storage_state_constraint.py @@ -20,8 +20,10 @@ Create Date: 2019-05-15 17:02:56.595274 """ from alembic import op +from sqlalchemy.ext import declarative -from cloudkitty.storage_state import models +from oslo_db.sqlalchemy import models +import sqlalchemy # revision identifiers, used by Alembic. revision = 'c50ed2c19204' @@ -29,9 +31,11 @@ down_revision = 'd9d103dd4dcf' branch_labels = None depends_on = None +Base = declarative.declarative_base() + def upgrade(): - for name, table in models.Base.metadata.tables.items(): + for name, table in Base.metadata.tables.items(): if name == 'cloudkitty_storage_states': with op.batch_alter_table(name, @@ -43,3 +47,36 @@ def upgrade(): ['identifier', 'scope_key', 'collector', 'fetcher']) break + + +class IdentifierTableForThisDataBaseModelChangeSet(Base, models.ModelBase): + """Represents the state of a given identifier.""" + + @declarative.declared_attr + def __table_args__(cls): + return ( + sqlalchemy.schema.UniqueConstraint( + 'identifier', + 'scope_key', + 'collector', + 'fetcher', + name='uq_cloudkitty_storage_states_identifier'), + ) + + __tablename__ = 'cloudkitty_storage_states' + + id = sqlalchemy.Column(sqlalchemy.Integer, + primary_key=True) + identifier = sqlalchemy.Column(sqlalchemy.String(256), + nullable=False, + unique=False) + scope_key = sqlalchemy.Column(sqlalchemy.String(40), + nullable=True, + unique=False) + fetcher = sqlalchemy.Column(sqlalchemy.String(40), + nullable=True, + unique=False) + collector = sqlalchemy.Column(sqlalchemy.String(40), + nullable=True, + unique=False) + state = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False) diff --git a/cloudkitty/storage_state/models.py b/cloudkitty/storage_state/models.py index 53edfd46..1155004a 100644 --- a/cloudkitty/storage_state/models.py +++ b/cloudkitty/storage_state/models.py @@ -52,5 +52,5 @@ class IdentifierState(Base, models.ModelBase): collector = sqlalchemy.Column(sqlalchemy.String(40), nullable=True, unique=False) - state = sqlalchemy.Column(sqlalchemy.DateTime, - nullable=False) + last_processed_timestamp = sqlalchemy.Column( + sqlalchemy.DateTime, nullable=False) diff --git a/cloudkitty/tests/storage/v2/test_storage_unit.py b/cloudkitty/tests/storage/v2/test_storage_unit.py index 8957582d..69e95ecc 100644 --- a/cloudkitty/tests/storage/v2/test_storage_unit.py +++ b/cloudkitty/tests/storage/v2/test_storage_unit.py @@ -156,8 +156,6 @@ class StorageUnitTest(TestCase): def test_get_total_one_scope_one_period(self): expected_total, expected_qty, _ = self._expected_total_qty_len( [self.data[0]], self._project_id) - expected_total, expected_qty, _ = self._expected_total_qty_len( - [self.data[0]], self._project_id) begin = datetime.datetime(2018, 1, 1) end = datetime.datetime(2018, 1, 1, 1) diff --git a/cloudkitty/tests/test_orchestrator.py b/cloudkitty/tests/test_orchestrator.py index e8996bbf..b1b1cd76 100644 --- a/cloudkitty/tests/test_orchestrator.py +++ b/cloudkitty/tests/test_orchestrator.py @@ -54,8 +54,9 @@ class ScopeEndpointTest(tests.TestCase): storage_delete_patch = mock.patch.object( influx.InfluxStorage, 'delete') + state_set_patch = mock.patch.object( - storage_state.StateManager, 'set_state') + storage_state.StateManager, 'set_last_processed_timestamp') with coord_start_patch, lock_acquire_patch, \ storage_delete_patch as sd, state_set_patch as ss: @@ -76,7 +77,7 @@ class ScopeEndpointTest(tests.TestCase): 'fetcher': 'gnocchi', }, ], - 'state': '20190716T085501Z', + 'last_processed_timestamp': '20190716T085501Z', }) sd.assert_has_calls([ diff --git a/cloudkitty/tests/test_storage_state.py b/cloudkitty/tests/test_storage_state.py index 7fddbb45..85cf942a 100644 --- a/cloudkitty/tests/test_storage_state.py +++ b/cloudkitty/tests/test_storage_state.py @@ -56,12 +56,12 @@ class StateManagerTest(tests.TestCase): return output, mock.Mock(return_value=output) @staticmethod - def _get_r_mock(scope_key, collector, fetcher, state): + def _get_r_mock(scope_key, collector, fetcher, last_processed_timestamp): r_mock = mock.Mock() r_mock.scope_key = scope_key r_mock.collector = collector r_mock.fetcher = fetcher - r_mock.state = state + r_mock.last_processed_timestamp = last_processed_timestamp return r_mock def _test_x_state_does_update_columns(self, func): @@ -127,6 +127,6 @@ class StateManagerTest(tests.TestCase): sm.return_value = session_mock = mock.MagicMock() self.assertNotEqual(r_mock.state, new_state) self._state.set_state('fake_identifier', new_state) - self.assertEqual(r_mock.state, new_state) + self.assertEqual(r_mock.last_processed_timestamp, new_state) session_mock.commit.assert_called_once() session_mock.add.assert_not_called() diff --git a/doc/source/api-reference/v2/api_samples/scope/scope_get.json b/doc/source/api-reference/v2/api_samples/scope/scope_get.json index d8a1d67a..0e1ce4b7 100644 --- a/doc/source/api-reference/v2/api_samples/scope/scope_get.json +++ b/doc/source/api-reference/v2/api_samples/scope/scope_get.json @@ -5,21 +5,21 @@ "fetcher": "keystone", "scope_id": "7a7e5183264644a7a79530eb56e59941", "scope_key": "project_id", - "state": "2019-05-09 10:00:00" + "last_processed_timestamp": "2019-05-09 10:00:00" }, { "collector": "gnocchi", "fetcher": "keystone", "scope_id": "9084fadcbd46481788e0ad7405dcbf12", "scope_key": "project_id", - "state": "2019-05-08 03:00:00" + "last_processed_timestamp": "2019-05-08 03:00:00" }, { "collector": "gnocchi", "fetcher": "keystone", "scope_id": "1f41d183fca5490ebda5c63fbaca026a", "scope_key": "project_id", - "state": "2019-05-06 22:00:00" + "last_processed_timestamp": "2019-05-06 22:00:00" } ] } diff --git a/doc/source/api-reference/v2/scope/scope.inc b/doc/source/api-reference/v2/scope/scope.inc index 276d1a5c..da1103cc 100644 --- a/doc/source/api-reference/v2/scope/scope.inc +++ b/doc/source/api-reference/v2/scope/scope.inc @@ -40,6 +40,7 @@ Response - collector: collector_resp - fetcher: fetcher_resp - state: state + - last_processed_timestamp: last_processed_timestamp - scope_id: scope_id_resp - scope_key: scope_key_resp @@ -60,6 +61,7 @@ Reset the status of several scopes. .. rest_parameters:: scope/scope_parameters.yml - state: state + - last_processed_timestamp: last_processed_timestamp - collector: collector_body - fetcher: fetcher_body - scope_id: scope_id_body diff --git a/doc/source/api-reference/v2/scope/scope_parameters.yml b/doc/source/api-reference/v2/scope/scope_parameters.yml index 0d5ba0ba..3be8107a 100644 --- a/doc/source/api-reference/v2/scope/scope_parameters.yml +++ b/doc/source/api-reference/v2/scope/scope_parameters.yml @@ -66,6 +66,13 @@ fetcher_resp: description: Fetcher for the given scope in: body +last_processed_timestamp: + in: body + description: | + It represents the last processed timestamp for the storage state element. + type: iso8601 timestamp + required: true + scope_id_body: <<: *scope_id in: body @@ -89,6 +96,9 @@ scope_key_resp: state: in: body description: | - State of the scope. + State of the scope. This variable represents the last processed + timestamp for the storage state element. It is DEPRECATED, and it will + be removed in upcoming releases. The alternative is + `last_processed_timestamp`. type: iso8601 timestamp required: true