diff --git a/gnocchi/storage/_carbonara.py b/gnocchi/storage/_carbonara.py index 70a05148..6ab6b931 100644 --- a/gnocchi/storage/_carbonara.py +++ b/gnocchi/storage/_carbonara.py @@ -21,7 +21,6 @@ import operator from concurrent import futures import iso8601 -import msgpack from oslo_config import cfg from oslo_log import log from oslo_utils import timeutils @@ -354,83 +353,6 @@ class CarbonaraBasedStorage(storage.StorageDriver): aggregation, granularity, version=3): raise NotImplementedError - def _check_for_metric_upgrade(self, metric): - # FIXME(gordc): this is only required for v2.x to v3.x storage upgrade. - # we should make storage version easily detectable rather than - # checking each metric individually - lock = self._lock(metric.id) - with lock: - try: - old_unaggregated = self._get_unaggregated_timeserie_and_unserialize_v2( # noqa - metric) - except (storage.MetricDoesNotExist, CorruptionError) as e: - # This case can happen if v3.0 to v3.x or if no measures - # pushed. skip the rest of upgrade on metric. - LOG.debug( - "Unable to find v2 unaggregated timeserie for " - "metric %s, no data to upgrade: %s", - metric.id, e) - return - - unaggregated = carbonara.BoundTimeSerie( - ts=old_unaggregated.ts, - block_size=metric.archive_policy.max_block_size, - back_window=metric.archive_policy.back_window) - # Upgrade unaggregated timeserie to v3 - self._store_unaggregated_timeserie( - metric, unaggregated.serialize()) - oldest_mutable_timestamp = ( - unaggregated.first_block_timestamp() - ) - for agg_method, d in itertools.product( - metric.archive_policy.aggregation_methods, - metric.archive_policy.definition): - LOG.debug( - "Checking if the metric %s needs migration for %s", - metric, agg_method) - - try: - all_keys = self._list_split_keys_for_metric( - metric, agg_method, d.granularity, version=2) - except storage.MetricDoesNotExist: - # Just try the next metric, this one has no measures - break - else: - LOG.info("Migrating metric %s to new format", metric) - timeseries = filter( - lambda x: x is not None, - self._map_in_thread( - self._get_measures_and_unserialize_v2, - ((metric, key, agg_method, d.granularity) - for key in all_keys)) - ) - ts = carbonara.AggregatedTimeSerie.from_timeseries( - sampling=d.granularity, - aggregation_method=agg_method, - timeseries=timeseries, max_size=d.points) - for key, split in ts.split(): - self._store_timeserie_split( - metric, key, split, - ts.aggregation_method, - d, oldest_mutable_timestamp) - for key in all_keys: - self._delete_metric_measures( - metric, key, agg_method, - d.granularity, version=None) - self._delete_unaggregated_timeserie(metric, version=None) - LOG.info("Migrated metric %s to new format", metric) - - def upgrade(self, index): - marker = None - while True: - metrics = [(metric,) for metric in - index.list_metrics(limit=self.UPGRADE_BATCH_SIZE, - marker=marker)] - self._map_in_thread(self._check_for_metric_upgrade, metrics) - if len(metrics) == 0: - break - marker = metrics[-1][0].id - def process_new_measures(self, indexer, metrics_to_process, sync=False): # process only active metrics. deleted metrics with unprocessed @@ -645,31 +567,3 @@ class CarbonaraBasedStorage(storage.StorageDriver): # We use 'list' to iterate all threads here to raise the first # exception now, not much choice return list(executor.map(lambda args: method(*args), list_of_args)) - - @staticmethod - def _unserialize_timeserie_v2(data): - return carbonara.TimeSerie.from_data( - *carbonara.TimeSerie._timestamps_and_values_from_dict( - msgpack.loads(data, encoding='utf-8')['values']), - clean=True) - - def _get_unaggregated_timeserie_and_unserialize_v2(self, metric): - """Unserialization method for unaggregated v2 timeseries.""" - data = self._get_unaggregated_timeserie(metric, version=None) - try: - return self._unserialize_timeserie_v2(data) - except ValueError: - LOG.error("Data corruption detected for %s ignoring.", metric.id) - - def _get_measures_and_unserialize_v2(self, metric, key, - aggregation, granularity): - """Unserialization method for upgrading v2 objects. Upgrade only.""" - data = self._get_measures( - metric, key, aggregation, granularity, version=None) - try: - return self._unserialize_timeserie_v2(data) - except ValueError: - LOG.error("Data corruption detected for %s " - "aggregated `%s' timeserie, granularity `%s' " - "around time `%s', ignoring.", - metric.id, aggregation, granularity, key) diff --git a/gnocchi/storage/ceph.py b/gnocchi/storage/ceph.py index f261d41c..4d5d930b 100644 --- a/gnocchi/storage/ceph.py +++ b/gnocchi/storage/ceph.py @@ -50,35 +50,6 @@ class CephStorage(_carbonara.CarbonaraBasedStorage): ceph.close_rados_connection(self.rados, self.ioctx) super(CephStorage, self).stop() - def _check_for_metric_upgrade(self, metric): - lock = self._lock(metric.id) - with lock: - container = "gnocchi_%s_container" % metric.id - unagg_obj = self._build_unaggregated_timeserie_path(metric, 3) - try: - xattrs = tuple(k for k, v in self.ioctx.get_xattrs(container)) - except rados.ObjectNotFound: - # this means already upgraded or some corruption? move on. - pass - else: - # if xattrs are found, it means we're coming from - # gnocchiv2. migrate to omap accordingly. - if xattrs: - keys = xattrs - # if no xattrs but object exists, it means it already - # migrated to v3 and now upgrade to use single object - else: - with rados.ReadOpCtx() as op: - omaps, ret = self.ioctx.get_omap_vals(op, "", "", -1) - self.ioctx.operate_read_op(op, container) - keys = (k for k, __ in omaps) - with rados.WriteOpCtx() as op: - self.ioctx.set_omap(op, keys, - tuple([b""] * len(keys))) - self.ioctx.operate_write_op(op, unagg_obj) - self.ioctx.remove_object(container) - super(CephStorage, self)._check_for_metric_upgrade(metric) - @staticmethod def _get_object_name(metric, timestamp_key, aggregation, granularity, version=3): diff --git a/gnocchi/storage/incoming/_carbonara.py b/gnocchi/storage/incoming/_carbonara.py index 0c349e9d..dc77d2d1 100644 --- a/gnocchi/storage/incoming/_carbonara.py +++ b/gnocchi/storage/incoming/_carbonara.py @@ -18,7 +18,6 @@ import itertools import struct from oslo_log import log -from oslo_serialization import msgpackutils import pandas import six.moves @@ -38,14 +37,10 @@ class CarbonaraBasedStorage(incoming.StorageDriver): measures = struct.unpack( "<" + self._MEASURE_SERIAL_FORMAT * nb_measures, data) except struct.error: - # This either a corruption, either a v2 measures - try: - return msgpackutils.loads(data) - except ValueError: - LOG.error( - "Unable to decode measure %s, possible data corruption", - measure_id) - raise + LOG.error( + "Unable to decode measure %s, possible data corruption", + measure_id) + raise return six.moves.zip( pandas.to_datetime(measures[::2], unit='ns'), itertools.islice(measures, 1, len(measures), 2)) diff --git a/gnocchi/tests/storage/test_carbonara.py b/gnocchi/tests/storage/test_carbonara.py deleted file mode 100644 index 62b89482..00000000 --- a/gnocchi/tests/storage/test_carbonara.py +++ /dev/null @@ -1,210 +0,0 @@ -# -*- encoding: utf-8 -*- -# -# Copyright © 2015-2016 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -import datetime -import itertools -import uuid - -import mock -import msgpack -import six - -from gnocchi import carbonara -from gnocchi import storage -from gnocchi.storage import _carbonara -from gnocchi.tests import base as tests_base -from gnocchi import utils - - -def _serialize_v2(split): - d = {'values': dict((timestamp.value, float(v)) - for timestamp, v - in six.iteritems(split.ts.dropna()))} - return msgpack.dumps(d) - - -class TestCarbonaraMigration(tests_base.TestCase): - def setUp(self): - super(TestCarbonaraMigration, self).setUp() - if not isinstance(self.storage, _carbonara.CarbonaraBasedStorage): - self.skipTest("This driver is not based on Carbonara") - - self.metric = storage.Metric(uuid.uuid4(), - self.archive_policies['low']) - - self.storage._create_metric(self.metric) - - with mock.patch('gnocchi.carbonara.SplitKey.' - 'POINTS_PER_SPLIT', 14400): - bts = carbonara.BoundTimeSerie( - block_size=self.metric.archive_policy.max_block_size, - back_window=self.metric.archive_policy.back_window) - # NOTE: there is a split at 2016-07-18 on granularity 300 - values = ((datetime.datetime(2016, 7, 17, 23, 59, 0), 4), - (datetime.datetime(2016, 7, 17, 23, 59, 4), 5), - (datetime.datetime(2016, 7, 17, 23, 59, 9), 6), - (datetime.datetime(2016, 7, 18, 0, 0, 0), 7), - (datetime.datetime(2016, 7, 18, 0, 0, 4), 8), - (datetime.datetime(2016, 7, 18, 0, 0, 9), 9)) - - def _before_truncate(bound_timeserie): - for d, agg in itertools.product( - self.metric.archive_policy.definition, - ['mean', 'max']): - grouped = bound_timeserie.group_serie( - d.granularity, carbonara.round_timestamp( - bound_timeserie.first, d.granularity * 10e8)) - - aggts = carbonara.AggregatedTimeSerie.from_grouped_serie( - grouped, d.granularity, agg, max_size=d.points) - - for key, split in aggts.split(): - self.storage._store_metric_measures( - self.metric, - str(key), - agg, d.granularity, - _serialize_v2(split), offset=None, version=None) - - bts.set_values(values, before_truncate_callback=_before_truncate) - self.storage._store_unaggregated_timeserie(self.metric, - _serialize_v2(bts), - version=None) - - def upgrade(self): - with mock.patch.object(self.index, 'list_metrics') as f: - f.side_effect = [[self.metric], []] - self.storage.upgrade(self.index) - - def test_get_measures(self): - with mock.patch.object( - self.storage, '_get_measures_and_unserialize', - side_effect=self.storage._get_measures_and_unserialize_v2): - self.assertEqual([ - (utils.datetime_utc(2016, 7, 17), 86400, 5), - (utils.datetime_utc(2016, 7, 18), 86400, 8), - (utils.datetime_utc(2016, 7, 17, 23), 3600, 5), - (utils.datetime_utc(2016, 7, 18, 0), 3600, 8), - (utils.datetime_utc(2016, 7, 17, 23, 55), 300, 5), - (utils.datetime_utc(2016, 7, 18, 0), 300, 8) - ], self.storage.get_measures(self.metric)) - - self.assertEqual([ - (utils.datetime_utc(2016, 7, 17), 86400, 6), - (utils.datetime_utc(2016, 7, 18), 86400, 9), - (utils.datetime_utc(2016, 7, 17, 23), 3600, 6), - (utils.datetime_utc(2016, 7, 18, 0), 3600, 9), - (utils.datetime_utc(2016, 7, 17, 23, 55), 300, 6), - (utils.datetime_utc(2016, 7, 18, 0), 300, 9) - ], self.storage.get_measures(self.metric, aggregation='max')) - - self.upgrade() - - self.assertEqual([ - (utils.datetime_utc(2016, 7, 17), 86400, 5), - (utils.datetime_utc(2016, 7, 18), 86400, 8), - (utils.datetime_utc(2016, 7, 17, 23), 3600, 5), - (utils.datetime_utc(2016, 7, 18, 0), 3600, 8), - (utils.datetime_utc(2016, 7, 17, 23, 55), 300, 5), - (utils.datetime_utc(2016, 7, 18, 0), 300, 8) - ], self.storage.get_measures(self.metric)) - - self.assertEqual([ - (utils.datetime_utc(2016, 7, 17), 86400, 6), - (utils.datetime_utc(2016, 7, 18), 86400, 9), - (utils.datetime_utc(2016, 7, 17, 23), 3600, 6), - (utils.datetime_utc(2016, 7, 18, 0), 3600, 9), - (utils.datetime_utc(2016, 7, 17, 23, 55), 300, 6), - (utils.datetime_utc(2016, 7, 18, 0), 300, 9) - ], self.storage.get_measures(self.metric, aggregation='max')) - - with mock.patch.object( - self.storage, '_get_measures_and_unserialize', - side_effect=self.storage._get_measures_and_unserialize_v2): - self.assertRaises( - storage.AggregationDoesNotExist, - self.storage.get_measures, self.metric) - - self.assertRaises( - storage.AggregationDoesNotExist, - self.storage.get_measures, self.metric, aggregation='max') - - self.storage.incoming.add_measures(self.metric, [ - storage.Measure(utils.dt_to_unix_ns(2016, 7, 18), 69), - storage.Measure(utils.dt_to_unix_ns(2016, 7, 18, 1, 1), 64), - ]) - - with mock.patch.object(self.index, 'list_metrics') as f: - f.side_effect = [[self.metric], []] - self.storage.process_background_tasks( - self.index, [str(self.metric.id)], sync=True) - - self.assertEqual([ - (utils.datetime_utc(2016, 7, 17), 86400, 6), - (utils.datetime_utc(2016, 7, 18), 86400, 69), - (utils.datetime_utc(2016, 7, 17, 23), 3600, 6), - (utils.datetime_utc(2016, 7, 18, 0), 3600, 69), - (utils.datetime_utc(2016, 7, 18, 1), 3600, 64), - (utils.datetime_utc(2016, 7, 18, 0), 300, 69), - (utils.datetime_utc(2016, 7, 18, 1), 300, 64) - ], self.storage.get_measures(self.metric, aggregation='max')) - - def test_upgrade_upgraded_storage(self): - with mock.patch.object( - self.storage, '_get_measures_and_unserialize', - side_effect=self.storage._get_measures_and_unserialize_v2): - self.assertEqual([ - (utils.datetime_utc(2016, 7, 17), 86400, 5), - (utils.datetime_utc(2016, 7, 18), 86400, 8), - (utils.datetime_utc(2016, 7, 17, 23), 3600, 5), - (utils.datetime_utc(2016, 7, 18, 0), 3600, 8), - (utils.datetime_utc(2016, 7, 17, 23, 55), 300, 5), - (utils.datetime_utc(2016, 7, 18, 0), 300, 8) - ], self.storage.get_measures(self.metric)) - - self.assertEqual([ - (utils.datetime_utc(2016, 7, 17), 86400, 6), - (utils.datetime_utc(2016, 7, 18), 86400, 9), - (utils.datetime_utc(2016, 7, 17, 23), 3600, 6), - (utils.datetime_utc(2016, 7, 18, 0), 3600, 9), - (utils.datetime_utc(2016, 7, 17, 23, 55), 300, 6), - (utils.datetime_utc(2016, 7, 18, 0), 300, 9) - ], self.storage.get_measures(self.metric, aggregation='max')) - - self.upgrade() - self.upgrade() - - self.assertEqual([ - (utils.datetime_utc(2016, 7, 17), 86400, 5), - (utils.datetime_utc(2016, 7, 18), 86400, 8), - (utils.datetime_utc(2016, 7, 17, 23), 3600, 5), - (utils.datetime_utc(2016, 7, 18, 0), 3600, 8), - (utils.datetime_utc(2016, 7, 17, 23, 55), 300, 5), - (utils.datetime_utc(2016, 7, 18, 0), 300, 8) - ], self.storage.get_measures(self.metric)) - - self.assertEqual([ - (utils.datetime_utc(2016, 7, 17), 86400, 6), - (utils.datetime_utc(2016, 7, 18), 86400, 9), - (utils.datetime_utc(2016, 7, 17, 23), 3600, 6), - (utils.datetime_utc(2016, 7, 18, 0), 3600, 9), - (utils.datetime_utc(2016, 7, 17, 23, 55), 300, 6), - (utils.datetime_utc(2016, 7, 18, 0), 300, 9) - ], self.storage.get_measures(self.metric, aggregation='max')) - - def test_delete_metric_not_upgraded(self): - # Make sure that we delete everything (e.g. objects + container) - # correctly even if the metric has not been upgraded. - self.storage.delete_metric(self.metric) - self.assertEqual([], self.storage.get_measures(self.metric)) diff --git a/releasenotes/notes/upgrade-code-removal-from-2.2-and-3.0-a01fc64ecb39c327.yaml b/releasenotes/notes/upgrade-code-removal-from-2.2-and-3.0-a01fc64ecb39c327.yaml new file mode 100644 index 00000000..bd0480ca --- /dev/null +++ b/releasenotes/notes/upgrade-code-removal-from-2.2-and-3.0-a01fc64ecb39c327.yaml @@ -0,0 +1,4 @@ +--- +upgrade: + - | + The storage upgrade is only supported from version 3.1. diff --git a/tox.ini b/tox.ini index d7fea39c..bf78f5bb 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] minversion = 1.8 -envlist = py{35,27}-{postgresql,mysql}{,-file,-swift,-ceph,-s3},pep8,bashate,py35-postgresql-file-upgrade-from-2.2,py27-mysql-ceph-upgrade-from-2.2 +envlist = py{35,27}-{postgresql,mysql}{,-file,-swift,-ceph,-s3},pep8,bashate [testenv] usedevelop = True @@ -62,34 +62,6 @@ deps = gnocchi[{env:GNOCCHI_VARIANT}]>=3.1,<3.2 pifpaf>=0.13 commands = pifpaf --env-prefix INDEXER run mysql -- pifpaf --env-prefix STORAGE run ceph {toxinidir}/run-upgrade-tests.sh {posargs} -[testenv:py35-postgresql-file-upgrade-from-2.2] -# We should always recreate since the script upgrade -# Gnocchi we can't reuse the virtualenv -envdir = upgrade -recreate = True -skip_install = True -usedevelop = False -setenv = GNOCCHI_VARIANT=test,postgresql,file -deps = gnocchi[{env:GNOCCHI_VARIANT}]>=2.2,<2.3 - pifpaf>=0.13 - gnocchiclient>=2.8.0 -commands = pifpaf --env-prefix INDEXER run postgresql {toxinidir}/run-upgrade-tests.sh {posargs} - -[testenv:py27-mysql-ceph-upgrade-from-2.2] -# We should always recreate since the script upgrade -# Gnocchi we can't reuse the virtualenv -envdir = upgrade -recreate = True -skip_install = True -usedevelop = False -setenv = GNOCCHI_VARIANT=test,mysql,ceph,ceph_recommended_lib -deps = gnocchi[{env:GNOCCHI_VARIANT}]>=2.2,<2.3 - gnocchiclient>=2.8.0 - pifpaf>=0.13 - cradox -# cradox is required because 2.2 extra names are incorrect -commands = pifpaf --env-prefix INDEXER run mysql -- pifpaf --env-prefix STORAGE run ceph {toxinidir}/run-upgrade-tests.sh {posargs} - [testenv:bashate] deps = bashate commands = bashate -v devstack/plugin.sh devstack/gate/gate_hook.sh devstack/gate/post_test_hook.sh