diff --git a/monasca_persister/config.py b/monasca_persister/config.py index 00e85367..ae29c4d0 100644 --- a/monasca_persister/config.py +++ b/monasca_persister/config.py @@ -45,7 +45,7 @@ def _get_config_files(): return conf_files -def parse_args(): +def parse_args(description='Persists metrics & alarm history in TSDB'): global _CONF_LOADED if _CONF_LOADED: LOG.debug('Configuration has been already loaded') @@ -58,7 +58,7 @@ def parse_args(): project='monasca', version=version.version_str, default_config_files=_get_config_files(), - description='Persists metrics & alarm history in TSDB') + description=description) log.setup(CONF, product_name='monasca-persister', diff --git a/monasca_persister/tools/influxdb/db-per-tenant/README.rst b/monasca_persister/tools/influxdb/db-per-tenant/README.rst new file mode 100644 index 00000000..29e65387 --- /dev/null +++ b/monasca_persister/tools/influxdb/db-per-tenant/README.rst @@ -0,0 +1,122 @@ +migrate-to-db-per-tenant.py +=========================== + +The general plan for the monasca project is to move in the direction of +having a database per tenant because: +- Not only give a finer grain control over retention policy per tenant + but also possibly speed up tenants queries by scoping them within their + project. +- Security is improved though better isolation. For example, a previous bug in + InfluxDB where the tenant ID was ignored in the query exposed data from + outside a tenants project. This is less likely to happen with a separate DB + per tenant.) +- We move in a direction of improving scalability for InfluxDB users + without the Enterprise license. In the future a dedicated InfluxDB + instance could optionally be used per project. + +For further reading - https://storyboard.openstack.org/#!/story/2006331 + +All effort has been made to ensure this is a safe process. And it +should be safe to run the tool multiple times. However, it is provided +AS IS and you should use it at your own risk. + +Usage +===== + +Steps to use this tool: + +- Log in to one node where monasca-persister is deployed. + +- Identify installation path to monasca-persister. This may be a + virtual environment such as + `/opt/stack/venv/monasca-/lib/python2.7/site-packages/monasca_persister` + or as in devstack + `/opt/stack/monasca-persister/monasca_persister/` which you may need to + activate. + +- Identify the existing configuration for monasca-persister. The likely + location for this is `/etc/monasca/persister.conf` if using devstack. + +- Optionally, make a backup of your database in case something goes + wrong but your original database should be left intact. + +- Open and edit `migrate-to-db-per-tenant.py` and edit mapping between + your existing projects to retention policies if you so wish. You may + also want to change `end_time_offset_override` to the length of history + you want the migration tool to consider. + +- Invoke the tool to migrate to database per tenant. The arguments inside the + square bracket default to the values shown when undefined. + +:: + + sudo -u mon-persister python migrate-to-db-per-tenant.py --config-file /etc/monasca/persister.conf --migrate-retention-policy project:2w,project2:1d --migrate-skip-regex ^log\\..+ [--migrate-time-unit w --migrate-start-time-offset 0 --migrate-end-time-offset 520] + + +- The progress of the migration will be logged to persister log file + specifed in the persister config. + + +FAQ +=== + +1. Will this interrupt the operation of Monasca? +- In theory, you can run this migration query on a live Monasca system + because the migration leaves the original database untouched. + However, in practice, it may degrade performance while the migration + is taking place. +2. How do I ensure that I migrate *all* metrics to the new scheme? i.e. + Do I need to stop all persisters writing to InfluxDB and let Kafka + buffer for a bit while the migration is taking place? +- If you stop the persister and the migration takes longer than the + Kafka buffer duration, this may cause data loss. It may be best to do + the first batch of migration on the entire dataset and perform + iterative migration of smaller chucks collected in the duration + migration was taking place. +3. Will the original index be left intact so that I can fall back if it + doesn't work? +- Yes. +4. What do I do after I have run the tool? I need to enable the feature + before starting the persisters right? +- If you enable the feature by flipping on `db_per_tenant` flag before + starting the migration and restart your persister service, your + persister will start writing to a database per tenant model. It may + be best to switch this feature on after the first round of migration, + restart the persister so that it starts writing to database per + tenancy and migrate the remainder of the data. Note that in the + second run, you may need to delete `migrate-success` so that the metrics + are not skipped. You may also want to try with smaller `--migrate-time-unit` + option (default: `w` for week) if larger chunks of migration fail. Other + option are `d` for day and `h` for hours. +5. How long does it take (rough estimate per GB of metrics?) +- This depends on your storage backend. On one of the systems in which + this tool was tested on, it was an overnight job to move 40GB of metrics. +6. Is the tool idempotent? i.e. could I migrate the bulk of the data + with the persisters running, stop the persisters, and then migrate + the delta in a short interval to minimise downtime? Can I run it + again if it fails? +- Yes, InfluxDB ensures that same time indices with the same tags, + fields and values are not duplicated. Copying things over twice is + perfectly safe. +7. Will it set fire to my cat? +- Depends if you cat is well behaved. + + +License +======= + +Copyright (c) 2019 StackHPC Limited + +Licensed under the Apache License, Version 2.0 (the “License”); you may +not use this file except in compliance with the License. You may obtain +a copy of the License at + +:: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an “AS IS” BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/monasca_persister/tools/influxdb/db-per-tenant/migrate-to-db-per-tenant.py b/monasca_persister/tools/influxdb/db-per-tenant/migrate-to-db-per-tenant.py new file mode 100755 index 00000000..3e1cb4b0 --- /dev/null +++ b/monasca_persister/tools/influxdb/db-per-tenant/migrate-to-db-per-tenant.py @@ -0,0 +1,237 @@ +# (C) Copyright 2019 StackHPC Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Monasca InfluxDB tool for migration to database per tenant + + Used to move data from monolithic database e.g. `monasca` to a database per + tenant model, e.g. `monasca_`. + + Please see the included README.rst for more details about creating an + appropriate configuration file. +""" + +import os +import re +import sys + +from monasca_persister import config +from monasca_persister.repositories.influxdb import metrics_repository + +from oslo_config import cfg + +from oslo_log import log + +LOG = log.getLogger(__name__) + +MIGRATE_QUERY = ('SELECT * INTO "{target_db}"..:MEASUREMENT' + ' FROM "{measurement}"' + ' WHERE _tenant_id=\'{tenant_id}\'' + ' AND time > {lower_time_offset}' + ' AND time <= {upper_time_offset}' + ' GROUP BY *') + +class MigrationHelper(object): + + def __init__(self): + repo = metrics_repository.MetricInfluxdbRepository() + self.conf = repo.conf + self.client = repo._influxdb_client + self.client.switch_database(self.conf.influxdb.database_name) + + def _migrate(self, measurement, tenant_id, start_time_offset, + end_time_offset, retention_policy={}, time_unit='w', + db_per_tenant=True, **kwargs): + + total_written = 0 + first_upper_time_offset = None + last_lower_time_offset = None + time_offset = start_time_offset + + if db_per_tenant: + target_db = "{}_{}".format(self.conf.influxdb.database_name, tenant_id) + self.client.create_database(target_db) + if retention_policy: + self.client.create_retention_policy(database=target_db, **retention_policy) + LOG.info(' into {}:'.format(target_db)) + + while end_time_offset > 0 and time_offset < end_time_offset: + lower_time_offset = 'now()-{}{}'.format(time_offset + 1, time_unit) + upper_time_offset = 'now()-{}{}'.format(time_offset, time_unit) + if not first_upper_time_offset: + first_upper_time_offset = upper_time_offset + migrate_query = MIGRATE_QUERY.format( + target_db=target_db, + measurement=measurement, + tenant_id=tenant_id, + lower_time_offset=lower_time_offset, + upper_time_offset=upper_time_offset, + ) + LOG.debug(migrate_query) + + written = next(self.client.query(migrate_query).get_points('result')).get('written') + total_written += written + time_offset += 1 + if written > 0: + last_lower_time_offset = lower_time_offset + LOG.info(" migrated {} entries from {} -> {} (cumulative {})".format( + written, + lower_time_offset, + upper_time_offset, + total_written, + )) + LOG.info(" finished migrating a total of {} entries from {} -> {}.".format( + total_written, + last_lower_time_offset, + first_upper_time_offset, + )) + + def get_measurements(self, fname): + measurements = [] + if fname: + with open(fname, 'a+') as f: + measurements = [l.strip() for l in f.readlines()] + if not measurements: + result = self.client.query('SHOW MEASUREMENTS').get_points('measurements') + measurements = [m.get('name') for m in result] + if fname: + with open(fname, 'w') as f: + for r in measurements: + f.write(r + '\n') + return measurements + + def get_tenancies(self, measurements): + result = self.client.query("SHOW TAG VALUES WITH KEY = _tenant_id") + return {m: [t.get('value') for t in result.get_points(m)] for m in measurements} + + def get_complete(self, fname): + if fname: + with open(fname, 'a+') as fd: + return {l.strip() for l in fd.readlines()} + else: + return {} + + def migrate(self, + tenant_defaults={}, + default_start_time_offset=0, # Default: now + default_end_time_offset=(520), # Default: 10 years + skip_regex=[], + measurements_file=None, success_file=None, failure_file=None, **kwargs): + measurements = self.get_measurements(measurements_file) + tenancy = self.get_tenancies(measurements) + done = self.get_complete(success_file) + skip = set() + fail = set() + if failure_file: + if os.path.exists(failure_file): + os.remove(failure_file) + + filtered_measurements = [] + for measurement in measurements: + if any([f.match(measurement) for f in skip_regex]): + skip.add(measurement) + LOG.debug('Skipping {} because it matches a skip regex.'.format(measurement)) + continue + elif measurement in done: + LOG.debug('Skipping {} because its already done.'.format(measurement)) + continue + else: + filtered_measurements.append(measurement) + + for i, measurement in enumerate(filtered_measurements): + LOG.info('Migrating {}'.format(measurement)) + try: + for tenant_id in tenancy.get(measurement): + start_time_offset = tenant_defaults.get( + tenant_id, {}).get('start_time_offset_override', + default_start_time_offset) + end_time_offset = tenant_defaults.get( + tenant_id, {}).get('end_time_offset_override', + default_end_time_offset) + retention_policy = tenant_defaults.get( + tenant_id, {}) .get('rp', {}) + self._migrate(measurement, tenant_id, + start_time_offset=start_time_offset, + end_time_offset=end_time_offset, + retention_policy=retention_policy, **kwargs) + if success_file: + with open(success_file, 'a+') as fd: + fd.write('{}\n'.format(measurement)) + done.add(measurement) + except Exception as e: + LOG.error(e) + if failure_file: + with open(failure_file, 'a+') as fe: + fe.write('{}\t{}\n'.format(measurement, e)) + fail.add(measurement) + LOG.info("{}/{} (done {} + skip {} + fail {})/{}".format( + i + 1, len(filtered_measurements), len(done), len(skip), + len(fail), len(measurements))) + + +def main(): + CONF = cfg.CONF + cli_opts = [ + cfg.StrOpt('migrate-time-unit', choices=['h', 'd', 'w'], default='w', + help='Unit of time, h=hour, d=day, w=week (default: "w").'), + cfg.IntOpt('migrate-start-time-offset', default=0, + help='Start time offset in the given unit of time (default: 0).'), + cfg.IntOpt('migrate-end-time-offset', default=520, + help='End time offset in the given unit of time (default: 520).'), + cfg.DictOpt('migrate-retention-policy', default={}, + help=('Custom retention policy for projects in the provided' + 'time unit. (e.g. project-id-x:2,project-id-y:4)')), + cfg.ListOpt('migrate-skip-regex', default=[], + help=('Skip metrics that match this comma separated list of regex patterns.' + '(e.g. ^log\\\\..+,^cpu\\\\..+ to skip metrics beginning with log.)')), + ] + CONF.register_cli_opts(cli_opts) + config.parse_args("Monasca InfluxDB database per tenant migration tool") + + # Configure custom retention policy for your existing projects. For + # example, rp2w is a retention policy of two weeks which we can assign to + # project example-project-id. + tenant_defaults = dict() + for k, v in CONF.migrate_retention_policy.items(): + if v.isdigit(): + rp = '{}{}'.format(v, CONF.migrate_time_unit) + tenant_defaults[k] = dict( + end_time_offset_override=int(v), + rp=dict(name=rp, duration=rp, replication='1', default=True), + ) + LOG.info('Project {} will be applied retention policy: {}.'.format(k, rp)) + else: + raise ValueError('Retention policy for project {} must be an' + 'integer of given time unit. Current value:' + '{}.'.format(k, v)) + + skip_regex = [] + for p in CONF.migrate_skip_regex: + skip_regex.append(re.compile(str(p))) + LOG.info('Metrics matching pattern "{}" will be skipped.'.format(p)) + + helper = MigrationHelper() + helper.migrate(skip_regex=skip_regex, + tenant_defaults=tenant_defaults, + default_end_time_offset=CONF.migrate_end_time_offset, + default_start_time_offset=CONF.migrate_start_time_offset, + time_unit=CONF.migrate_time_unit, + measurements_file='migrate-measurements', + success_file='migrate-success', + failure_file='migrate-failure') + return 0 + + +if __name__ == "__main__": + sys.exit(main())