From e778c76ea49d19adb28935a1a01f09df993124a8 Mon Sep 17 00:00:00 2001 From: Scott Little Date: Mon, 13 Aug 2018 11:39:37 -0400 Subject: [PATCH] Relocate ceph-manager to stx-integ/ceph/ceph-manager Move content from stx-upstream to stx-integ Packages will be relocated to stx-integ: ceph/ ceph ceph-manager Change-Id: I129faa448e2e52fc82101ae7ebc8ad5688f21523 Story: 2002801 Task: 22687 Signed-off-by: Scott Little --- ceph/ceph-manager/.gitignore | 6 + ceph/ceph-manager/LICENSE | 202 ++++ ceph/ceph-manager/PKG-INFO | 13 + ceph/ceph-manager/centos/build_srpm.data | 3 + ceph/ceph-manager/centos/ceph-manager.spec | 70 ++ ceph/ceph-manager/ceph-manager/LICENSE | 202 ++++ .../ceph-manager/ceph_manager/__init__.py | 5 + .../ceph_manager/cache_tiering.py | 705 ++++++++++++++ .../ceph-manager/ceph_manager/ceph.py | 164 ++++ .../ceph-manager/ceph_manager/constants.py | 107 +++ .../ceph-manager/ceph_manager/exception.py | 130 +++ .../ceph-manager/ceph_manager/i18n.py | 15 + .../ceph-manager/ceph_manager/monitor.py | 893 ++++++++++++++++++ .../ceph-manager/ceph_manager/server.py | 249 +++++ .../ceph_manager/tests/__init__.py | 0 .../ceph_manager/tests/test_cache_flush.py | 309 ++++++ ceph/ceph-manager/ceph-manager/setup.py | 19 + .../ceph-manager/test-requirements.txt | 10 + ceph/ceph-manager/ceph-manager/tox.ini | 29 + .../ceph-manager/files/ceph-manager.logrotate | 11 + ceph/ceph-manager/files/ceph-manager.service | 17 + ceph/ceph-manager/scripts/bin/ceph-manager | 17 + ceph/ceph-manager/scripts/init.d/ceph-manager | 103 ++ 23 files changed, 3279 insertions(+) create mode 100644 ceph/ceph-manager/.gitignore create mode 100644 ceph/ceph-manager/LICENSE create mode 100644 ceph/ceph-manager/PKG-INFO create mode 100644 ceph/ceph-manager/centos/build_srpm.data create mode 100644 ceph/ceph-manager/centos/ceph-manager.spec create mode 100644 ceph/ceph-manager/ceph-manager/LICENSE create mode 100644 ceph/ceph-manager/ceph-manager/ceph_manager/__init__.py create mode 100644 ceph/ceph-manager/ceph-manager/ceph_manager/cache_tiering.py create mode 100644 ceph/ceph-manager/ceph-manager/ceph_manager/ceph.py create mode 100644 ceph/ceph-manager/ceph-manager/ceph_manager/constants.py create mode 100644 ceph/ceph-manager/ceph-manager/ceph_manager/exception.py create mode 100644 ceph/ceph-manager/ceph-manager/ceph_manager/i18n.py create mode 100644 ceph/ceph-manager/ceph-manager/ceph_manager/monitor.py create mode 100644 ceph/ceph-manager/ceph-manager/ceph_manager/server.py create mode 100644 ceph/ceph-manager/ceph-manager/ceph_manager/tests/__init__.py create mode 100644 ceph/ceph-manager/ceph-manager/ceph_manager/tests/test_cache_flush.py create mode 100644 ceph/ceph-manager/ceph-manager/setup.py create mode 100644 ceph/ceph-manager/ceph-manager/test-requirements.txt create mode 100644 ceph/ceph-manager/ceph-manager/tox.ini create mode 100644 ceph/ceph-manager/files/ceph-manager.logrotate create mode 100644 ceph/ceph-manager/files/ceph-manager.service create mode 100644 ceph/ceph-manager/scripts/bin/ceph-manager create mode 100644 ceph/ceph-manager/scripts/init.d/ceph-manager diff --git a/ceph/ceph-manager/.gitignore b/ceph/ceph-manager/.gitignore new file mode 100644 index 00000000..78868598 --- /dev/null +++ b/ceph/ceph-manager/.gitignore @@ -0,0 +1,6 @@ +!.distro +.distro/centos7/rpmbuild/RPMS +.distro/centos7/rpmbuild/SRPMS +.distro/centos7/rpmbuild/BUILD +.distro/centos7/rpmbuild/BUILDROOT +.distro/centos7/rpmbuild/SOURCES/ceph-manager*tar.gz diff --git a/ceph/ceph-manager/LICENSE b/ceph/ceph-manager/LICENSE new file mode 100644 index 00000000..d6456956 --- /dev/null +++ b/ceph/ceph-manager/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/ceph/ceph-manager/PKG-INFO b/ceph/ceph-manager/PKG-INFO new file mode 100644 index 00000000..5b6746d8 --- /dev/null +++ b/ceph/ceph-manager/PKG-INFO @@ -0,0 +1,13 @@ +Metadata-Version: 1.1 +Name: ceph-manager +Version: 1.0 +Summary: Handle Ceph API calls and provide status updates via alarms +Home-page: +Author: Windriver +Author-email: info@windriver.com +License: Apache-2.0 + +Description: Handle Ceph API calls and provide status updates via alarms + + +Platform: UNKNOWN diff --git a/ceph/ceph-manager/centos/build_srpm.data b/ceph/ceph-manager/centos/build_srpm.data new file mode 100644 index 00000000..d01510bd --- /dev/null +++ b/ceph/ceph-manager/centos/build_srpm.data @@ -0,0 +1,3 @@ +SRC_DIR="ceph-manager" +COPY_LIST_TO_TAR="files scripts" +TIS_PATCH_VER=4 diff --git a/ceph/ceph-manager/centos/ceph-manager.spec b/ceph/ceph-manager/centos/ceph-manager.spec new file mode 100644 index 00000000..2f54deb5 --- /dev/null +++ b/ceph/ceph-manager/centos/ceph-manager.spec @@ -0,0 +1,70 @@ +Summary: Handle Ceph API calls and provide status updates via alarms +Name: ceph-manager +Version: 1.0 +Release: %{tis_patch_ver}%{?_tis_dist} +License: Apache-2.0 +Group: base +Packager: Wind River +URL: unknown +Source0: %{name}-%{version}.tar.gz + +BuildRequires: python-setuptools +BuildRequires: systemd-units +BuildRequires: systemd-devel +Requires: sysinv + +%description +Handle Ceph API calls and provide status updates via alarms. +Handle sysinv RPC calls for long running Ceph API operations: +- cache tiering enable +- cache tiering disable + +%define local_bindir /usr/bin/ +%define local_etc_initd /etc/init.d/ +%define local_etc_logrotated /etc/logrotate.d/ +%define pythonroot /usr/lib64/python2.7/site-packages + +%define debug_package %{nil} + +%prep +%setup + +%build +%{__python} setup.py build + +%install +%{__python} setup.py install --root=$RPM_BUILD_ROOT \ + --install-lib=%{pythonroot} \ + --prefix=/usr \ + --install-data=/usr/share \ + --single-version-externally-managed + +install -d -m 755 %{buildroot}%{local_etc_initd} +install -p -D -m 700 scripts/init.d/ceph-manager %{buildroot}%{local_etc_initd}/ceph-manager + +install -d -m 755 %{buildroot}%{local_bindir} +install -p -D -m 700 scripts/bin/ceph-manager %{buildroot}%{local_bindir}/ceph-manager + +install -d -m 755 %{buildroot}%{local_etc_logrotated} +install -p -D -m 644 files/ceph-manager.logrotate %{buildroot}%{local_etc_logrotated}/ceph-manager.logrotate + +install -d -m 755 %{buildroot}%{_unitdir} +install -m 644 -p -D files/%{name}.service %{buildroot}%{_unitdir}/%{name}.service + +%clean +rm -rf $RPM_BUILD_ROOT + +# Note: The package name is ceph-manager but the import name is ceph_manager so +# can't use '%{name}'. +%files +%defattr(-,root,root,-) +%doc LICENSE +%{local_bindir}/* +%{local_etc_initd}/* +%{_unitdir}/%{name}.service +%dir %{local_etc_logrotated} +%{local_etc_logrotated}/* +%dir %{pythonroot}/ceph_manager +%{pythonroot}/ceph_manager/* +%dir %{pythonroot}/ceph_manager-%{version}.0-py2.7.egg-info +%{pythonroot}/ceph_manager-%{version}.0-py2.7.egg-info/* diff --git a/ceph/ceph-manager/ceph-manager/LICENSE b/ceph/ceph-manager/ceph-manager/LICENSE new file mode 100644 index 00000000..d6456956 --- /dev/null +++ b/ceph/ceph-manager/ceph-manager/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/ceph/ceph-manager/ceph-manager/ceph_manager/__init__.py b/ceph/ceph-manager/ceph-manager/ceph_manager/__init__.py new file mode 100644 index 00000000..754a8f4e --- /dev/null +++ b/ceph/ceph-manager/ceph-manager/ceph_manager/__init__.py @@ -0,0 +1,5 @@ +# +# Copyright (c) 2016 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# diff --git a/ceph/ceph-manager/ceph-manager/ceph_manager/cache_tiering.py b/ceph/ceph-manager/ceph-manager/ceph_manager/cache_tiering.py new file mode 100644 index 00000000..4e814c3b --- /dev/null +++ b/ceph/ceph-manager/ceph-manager/ceph_manager/cache_tiering.py @@ -0,0 +1,705 @@ +# +# Copyright (c) 2016 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +import copy +import contextlib +import functools +import math +import subprocess +import time +import traceback +# noinspection PyUnresolvedReferences +import eventlet +# noinspection PyUnresolvedReferences +from eventlet.semaphore import Semaphore +# noinspection PyUnresolvedReferences +from oslo_log import log as logging +# noinspection PyUnresolvedReferences +from sysinv.conductor.cache_tiering_service_config import ServiceConfig + +from i18n import _LI, _LW, _LE + +import constants +import exception +import ceph + +LOG = logging.getLogger(__name__) +CEPH_POOLS = copy.deepcopy(constants.CEPH_POOLS) + +MAX_WAIT = constants.CACHE_FLUSH_MAX_WAIT_OBJ_COUNT_DECREASE_SEC +MIN_WAIT = constants.CACHE_FLUSH_MIN_WAIT_OBJ_COUNT_DECREASE_SEC + + +class LockOwnership(object): + def __init__(self, sem): + self.sem = sem + + @contextlib.contextmanager + def __call__(self): + try: + yield + finally: + if self.sem: + self.sem.release() + + def transfer(self): + new_lo = LockOwnership(self.sem) + self.sem = None + return new_lo + + +class Lock(object): + + def __init__(self): + self.sem = Semaphore(value=1) + + def try_lock(self): + result = self.sem.acquire(blocking=False) + if result: + return LockOwnership(self.sem) + + +class CacheTiering(object): + + def __init__(self, service): + self.service = service + self.lock = Lock() + # will be unlocked by set_initial_config() + self._init_config_lock = self.lock.try_lock() + self.config = None + self.config_desired = None + self.config_applied = None + self.target_max_bytes = {} + + def set_initial_config(self, config): + with self._init_config_lock(): + LOG.info("Setting Ceph cache tiering initial configuration") + self.config = ServiceConfig.from_dict( + config.get(constants.CACHE_TIERING, {})) or \ + ServiceConfig() + self.config_desired = ServiceConfig.from_dict( + config.get(constants.CACHE_TIERING_DESIRED, {})) or \ + ServiceConfig() + self.config_applied = ServiceConfig.from_dict( + config.get(constants.CACHE_TIERING_APPLIED, {})) or \ + ServiceConfig() + if self.config_desired: + LOG.debug("set_initial_config config_desired %s " % + self.config_desired.to_dict()) + if self.config_applied: + LOG.debug("set_initial_config config_applied %s " % + self.config_applied.to_dict()) + + # Check that previous caching tier operation completed + # successfully or perform recovery + if (self.config_desired and + self.config_applied and + (self.config_desired.cache_enabled != + self.config_applied.cache_enabled)): + if self.config_desired.cache_enabled: + self.enable_cache(self.config_desired.to_dict(), + self.config_applied.to_dict(), + self._init_config_lock.transfer()) + else: + self.disable_cache(self.config_desired.to_dict(), + self.config_applied.to_dict(), + self._init_config_lock.transfer()) + + def is_locked(self): + lock_ownership = self.lock.try_lock() + if not lock_ownership: + return True + with lock_ownership(): + return False + + def update_pools_info(self): + global CEPH_POOLS + cfg = self.service.sysinv_conductor.call( + {}, 'get_ceph_pools_config') + CEPH_POOLS = copy.deepcopy(cfg) + LOG.info(_LI("update_pools_info: pools: {}").format(CEPH_POOLS)) + + def enable_cache(self, new_config, applied_config, lock_ownership=None): + new_config = ServiceConfig.from_dict(new_config) + applied_config = ServiceConfig.from_dict(applied_config) + if not lock_ownership: + lock_ownership = self.lock.try_lock() + if not lock_ownership: + raise exception.CephCacheEnableFailure() + with lock_ownership(): + eventlet.spawn(self.do_enable_cache, + new_config, applied_config, + lock_ownership.transfer()) + + def do_enable_cache(self, new_config, applied_config, lock_ownership): + LOG.info(_LI("cache_tiering_enable_cache: " + "new_config={}, applied_config={}").format( + new_config.to_dict(), applied_config.to_dict())) + _unwind_actions = [] + with lock_ownership(): + success = False + _exception = None + try: + self.config_desired.cache_enabled = True + self.update_pools_info() + for pool in CEPH_POOLS: + if (pool['pool_name'] == + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or + pool['pool_name'] == + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER): + object_pool_name = \ + self.service.monitor._get_object_pool_name() + pool['pool_name'] = object_pool_name + + self.cache_pool_create(pool) + _unwind_actions.append( + functools.partial(self.cache_pool_delete, pool)) + for pool in CEPH_POOLS: + if (pool['pool_name'] == + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or + pool['pool_name'] == + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER): + object_pool_name = \ + self.service.monitor._get_object_pool_name() + pool['pool_name'] = object_pool_name + + self.cache_tier_add(pool) + _unwind_actions.append( + functools.partial(self.cache_tier_remove, pool)) + for pool in CEPH_POOLS: + if (pool['pool_name'] == + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or + pool['pool_name'] == + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER): + object_pool_name = \ + self.service.monitor._get_object_pool_name() + pool['pool_name'] = object_pool_name + + self.cache_mode_set(pool, 'writeback') + self.cache_pool_set_config(pool, new_config) + self.cache_overlay_create(pool) + success = True + except Exception as e: + LOG.error(_LE('Failed to enable cache: reason=%s') % + traceback.format_exc()) + for action in reversed(_unwind_actions): + try: + action() + except Exception: + LOG.warn(_LW('Failed cache enable ' + 'unwind action: reason=%s') % + traceback.format_exc()) + success = False + _exception = str(e) + finally: + self.service.monitor.monitor_check_cache_tier(success) + if success: + self.config_applied.cache_enabled = True + self.service.sysinv_conductor.call( + {}, 'cache_tiering_enable_cache_complete', + success=success, exception=_exception, + new_config=new_config.to_dict(), + applied_config=applied_config.to_dict()) + # Run first update of periodic target_max_bytes + self.update_cache_target_max_bytes() + + @contextlib.contextmanager + def ignore_ceph_failure(self): + try: + yield + except exception.CephManagerException: + pass + + def disable_cache(self, new_config, applied_config, lock_ownership=None): + new_config = ServiceConfig.from_dict(new_config) + applied_config = ServiceConfig.from_dict(applied_config) + if not lock_ownership: + lock_ownership = self.lock.try_lock() + if not lock_ownership: + raise exception.CephCacheDisableFailure() + with lock_ownership(): + eventlet.spawn(self.do_disable_cache, + new_config, applied_config, + lock_ownership.transfer()) + + def do_disable_cache(self, new_config, applied_config, lock_ownership): + LOG.info(_LI("cache_tiering_disable_cache: " + "new_config={}, applied_config={}").format( + new_config, applied_config)) + with lock_ownership(): + success = False + _exception = None + try: + self.config_desired.cache_enabled = False + for pool in CEPH_POOLS: + if (pool['pool_name'] == + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or + pool['pool_name'] == + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER): + object_pool_name = \ + self.service.monitor._get_object_pool_name() + pool['pool_name'] = object_pool_name + + with self.ignore_ceph_failure(): + self.cache_mode_set( + pool, 'forward') + + for pool in CEPH_POOLS: + if (pool['pool_name'] == + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or + pool['pool_name'] == + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER): + object_pool_name = \ + self.service.monitor._get_object_pool_name() + pool['pool_name'] = object_pool_name + + retries_left = 3 + while True: + try: + self.cache_flush(pool) + break + except exception.CephCacheFlushFailure: + retries_left -= 1 + if not retries_left: + # give up + break + else: + time.sleep(1) + for pool in CEPH_POOLS: + if (pool['pool_name'] == + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or + pool['pool_name'] == + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER): + object_pool_name = \ + self.service.monitor._get_object_pool_name() + pool['pool_name'] = object_pool_name + + with self.ignore_ceph_failure(): + self.cache_overlay_delete(pool) + self.cache_tier_remove(pool) + for pool in CEPH_POOLS: + if (pool['pool_name'] == + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or + pool['pool_name'] == + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER): + object_pool_name = \ + self.service.monitor._get_object_pool_name() + pool['pool_name'] = object_pool_name + + with self.ignore_ceph_failure(): + self.cache_pool_delete(pool) + success = True + except Exception as e: + LOG.warn(_LE('Failed to disable cache: reason=%s') % + traceback.format_exc()) + _exception = str(e) + finally: + self.service.monitor.monitor_check_cache_tier(False) + if success: + self.config_desired.cache_enabled = False + self.config_applied.cache_enabled = False + self.service.sysinv_conductor.call( + {}, 'cache_tiering_disable_cache_complete', + success=success, exception=_exception, + new_config=new_config.to_dict(), + applied_config=applied_config.to_dict()) + + def get_pool_pg_num(self, pool_name): + return self.service.sysinv_conductor.call( + {}, 'get_pool_pg_num', + pool_name=pool_name) + + def cache_pool_create(self, pool): + backing_pool = pool['pool_name'] + cache_pool = backing_pool + '-cache' + pg_num = self.get_pool_pg_num(cache_pool) + if not ceph.osd_pool_exists(self.service.ceph_api, cache_pool): + ceph.osd_pool_create( + self.service.ceph_api, cache_pool, + pg_num, pg_num) + + def cache_pool_delete(self, pool): + cache_pool = pool['pool_name'] + '-cache' + ceph.osd_pool_delete( + self.service.ceph_api, cache_pool) + + def cache_tier_add(self, pool): + backing_pool = pool['pool_name'] + cache_pool = backing_pool + '-cache' + response, body = self.service.ceph_api.osd_tier_add( + backing_pool, cache_pool, + force_nonempty="--force-nonempty", + body='json') + if response.ok: + LOG.info(_LI("Added OSD tier: " + "backing_pool={}, cache_pool={}").format( + backing_pool, cache_pool)) + else: + e = exception.CephPoolAddTierFailure( + backing_pool=backing_pool, + cache_pool=cache_pool, + response_status_code=response.status_code, + response_reason=response.reason, + status=body.get('status'), + output=body.get('output')) + LOG.warn(e) + raise e + + def cache_tier_remove(self, pool): + backing_pool = pool['pool_name'] + cache_pool = backing_pool + '-cache' + response, body = self.service.ceph_api.osd_tier_remove( + backing_pool, cache_pool, body='json') + if response.ok: + LOG.info(_LI("Removed OSD tier: " + "backing_pool={}, cache_pool={}").format( + backing_pool, cache_pool)) + else: + e = exception.CephPoolRemoveTierFailure( + backing_pool=backing_pool, + cache_pool=cache_pool, + response_status_code=response.status_code, + response_reason=response.reason, + status=body.get('status'), + output=body.get('output')) + LOG.warn(e) + raise e + + def cache_mode_set(self, pool, mode): + backing_pool = pool['pool_name'] + cache_pool = backing_pool + '-cache' + response, body = self.service.ceph_api.osd_tier_cachemode( + cache_pool, mode, body='json') + if response.ok: + LOG.info(_LI("Set OSD tier cache mode: " + "cache_pool={}, mode={}").format(cache_pool, mode)) + else: + e = exception.CephCacheSetModeFailure( + cache_pool=cache_pool, + mode=mode, + response_status_code=response.status_code, + response_reason=response.reason, + status=body.get('status'), + output=body.get('output')) + LOG.warn(e) + raise e + + def cache_pool_set_config(self, pool, config): + for name, value in config.params.iteritems(): + self.cache_pool_set_param(pool, name, value) + + def cache_pool_set_param(self, pool, name, value): + backing_pool = pool['pool_name'] + cache_pool = backing_pool + '-cache' + ceph.osd_set_pool_param( + self.service.ceph_api, cache_pool, name, value) + + def cache_overlay_create(self, pool): + backing_pool = pool['pool_name'] + cache_pool = backing_pool + '-cache' + response, body = self.service.ceph_api.osd_tier_set_overlay( + backing_pool, cache_pool, body='json') + if response.ok: + LOG.info(_LI("Set OSD tier overlay: " + "backing_pool={}, cache_pool={}").format( + backing_pool, cache_pool)) + else: + e = exception.CephCacheCreateOverlayFailure( + backing_pool=backing_pool, + cache_pool=cache_pool, + response_status_code=response.status_code, + response_reason=response.reason, + status=body.get('status'), + output=body.get('output')) + LOG.warn(e) + raise e + + def cache_overlay_delete(self, pool): + backing_pool = pool['pool_name'] + cache_pool = pool['pool_name'] + response, body = self.service.ceph_api.osd_tier_remove_overlay( + backing_pool, body='json') + if response.ok: + LOG.info(_LI("Removed OSD tier overlay: " + "backing_pool={}").format(backing_pool)) + else: + e = exception.CephCacheDeleteOverlayFailure( + backing_pool=backing_pool, + cache_pool=cache_pool, + response_status_code=response.status_code, + response_reason=response.reason, + status=body.get('status'), + output=body.get('output')) + LOG.warn(e) + raise e + + @staticmethod + def rados_cache_flush_evict_all(pool): + backing_pool = pool['pool_name'] + cache_pool = backing_pool + '-cache' + try: + subprocess.check_call( + ['/usr/bin/rados', '-p', cache_pool, 'cache-flush-evict-all']) + LOG.info(_LI("Flushed OSD cache pool:" + "cache_pool={}").format(cache_pool)) + except subprocess.CalledProcessError as e: + _e = exception.CephCacheFlushFailure( + cache_pool=cache_pool, + return_code=str(e.returncode), + cmd=" ".join(e.cmd), + output=e.output) + LOG.warn(_e) + raise _e + + def cache_flush(self, pool): + backing_pool = pool['pool_name'] + cache_pool = backing_pool + '-cache' + try: + # set target_max_objects to a small value to force evacuation of + # objects from cache before we use rados cache-flush-evict-all + # WARNING: assuming cache_pool will be deleted after flush so + # we don't have to save/restore the value of target_max_objects + # + self.cache_pool_set_param(pool, 'target_max_objects', 1) + prev_object_count = None + wait_interval = MIN_WAIT + while True: + response, body = self.service.ceph_api.df(body='json') + if not response.ok: + LOG.warn(_LW( + "Failed to retrieve cluster free space stats: " + "status_code=%d, reason=%s") % ( + response.status_code, response.reason)) + break + stats = None + for s in body['output']['pools']: + if s['name'] == cache_pool: + stats = s['stats'] + break + if not stats: + LOG.warn(_LW("Missing pool free space stats: " + "cache_pool=%s") % cache_pool) + break + object_count = stats['objects'] + if object_count < constants.CACHE_FLUSH_OBJECTS_THRESHOLD: + break + if prev_object_count is not None: + delta_objects = object_count - prev_object_count + if delta_objects > 0: + LOG.warn(_LW("Unexpected increase in number " + "of objects in cache pool: " + "cache_pool=%s, prev_object_count=%d, " + "object_count=%d") % ( + cache_pool, prev_object_count, + object_count)) + break + if delta_objects == 0: + wait_interval *= 2 + if wait_interval > MAX_WAIT: + LOG.warn(_LW( + "Cache pool number of objects did not " + "decrease: cache_pool=%s, object_count=%d, " + "wait_interval=%d") % ( + cache_pool, object_count, wait_interval)) + break + else: + wait_interval = MIN_WAIT + time.sleep(wait_interval) + prev_object_count = object_count + except exception.CephPoolSetParamFailure as e: + LOG.warn(e) + finally: + self.rados_cache_flush_evict_all(pool) + + def update_cache_target_max_bytes(self): + "Dynamically compute target_max_bytes of caching pools" + + # Only compute if cache tiering is enabled + if self.config_applied and self.config_desired: + if (not self.config_desired.cache_enabled or + not self.config_applied.cache_enabled): + LOG.debug("Cache tiering disabled, no need to update " + "target_max_bytes.") + return + LOG.debug("Updating target_max_bytes") + + # Get available space + response, body = self.service.ceph_api.osd_df(body='json', + output_method='tree') + if not response.ok: + LOG.warn(_LW( + "Failed to retrieve cluster free space stats: " + "status_code=%d, reason=%s") % ( + response.status_code, response.reason)) + return + + storage_tier_size = 0 + cache_tier_size = 0 + + replication = constants.CEPH_REPLICATION_FACTOR + for node in body['output']['nodes']: + if node['name'] == 'storage-tier': + storage_tier_size = node['kb']*1024/replication + elif node['name'] == 'cache-tier': + cache_tier_size = node['kb']*1024/replication + + if storage_tier_size == 0 or cache_tier_size == 0: + LOG.info("Failed to get cluster size " + "(storage_tier_size=%s, cache_tier_size=%s)," + "retrying on next cycle" % + (storage_tier_size, cache_tier_size)) + return + + # Get available pools + response, body = self.service.ceph_api.osd_lspools(body='json') + if not response.ok: + LOG.warn(_LW( + "Failed to retrieve available pools: " + "status_code=%d, reason=%s") % ( + response.status_code, response.reason)) + return + pools = [p['poolname'] for p in body['output']] + + # Separate backing from caching for easy iteration + backing_pools = [] + caching_pools = [] + for p in pools: + if p.endswith('-cache'): + caching_pools.append(p) + else: + backing_pools.append(p) + LOG.debug("Pools: caching: %s, backing: %s" % (caching_pools, + backing_pools)) + + if not len(caching_pools): + # We do not have caching pools created yet + return + + # Get quota from backing pools that are cached + stats = {} + for p in caching_pools: + backing_name = p.replace('-cache', '') + stats[backing_name] = {} + try: + quota = ceph.osd_pool_get_quota(self.service.ceph_api, + backing_name) + except exception.CephPoolGetQuotaFailure as e: + LOG.warn(_LW( + "Failed to retrieve quota: " + "exception: %s") % str(e)) + return + stats[backing_name]['quota'] = quota['max_bytes'] + stats[backing_name]['quota_pt'] = (quota['max_bytes']*100.0 / + storage_tier_size) + LOG.debug("Quota for pool: %s " + "is: %s B representing %s pt" % + (backing_name, + quota['max_bytes'], + stats[backing_name]['quota_pt'])) + + # target_max_bytes logic: + # - For computing target_max_bytes cache_tier_size must be equal than + # the sum of target_max_bytes of each caching pool + # - target_max_bytes for each caching pool is computed as the + # percentage of quota in corresponding backing pool + # - the caching tiers has to work at full capacity, so if the sum of + # all quotas in the backing tier is different than 100% we need to + # normalize + # - if the quota is zero for any pool we add CACHE_TIERING_MIN_QUOTA + # by default *after* normalization so that we have real minimum + + # We compute the real percentage that need to be normalized after + # ensuring that we have CACHE_TIERING_MIN_QUOTA for each pool with + # a quota of 0 + real_100pt = 90.0 # we start from max and decrease it for each 0 pool + # Note: We must avoid reaching 100% at all costs! and + # cache_target_full_ratio, the Ceph parameter that is supposed to + # protect the cluster against this does not work in Ceph v0.94.6! + # Therefore a value of 90% is better suited for this + for p in caching_pools: + backing_name = p.replace('-cache', '') + if stats[backing_name]['quota_pt'] == 0: + real_100pt -= constants.CACHE_TIERING_MIN_QUOTA + LOG.debug("Quota before normalization for %s is: %s pt" % + (p, stats[backing_name]['quota_pt'])) + + # Compute total percentage of quotas for all backing pools. + # Should be 100% if correctly configured + total_quota_pt = 0 + for p in caching_pools: + backing_name = p.replace('-cache', '') + total_quota_pt += stats[backing_name]['quota_pt'] + LOG.debug("Total quota pt is: %s" % total_quota_pt) + + # Normalize quota pt to 100% (or real_100pt) + if total_quota_pt != 0: # to avoid divide by zero + for p in caching_pools: + backing_name = p.replace('-cache', '') + stats[backing_name]['quota_pt'] = \ + (stats[backing_name]['quota_pt'] * + (real_100pt / total_quota_pt)) + + # Do not allow quota to be 0 for any pool + total = 0 + for p in caching_pools: + backing_name = p.replace('-cache', '') + if stats[backing_name]['quota_pt'] == 0: + stats[backing_name]['quota_pt'] = \ + constants.CACHE_TIERING_MIN_QUOTA + total += stats[backing_name]['quota_pt'] + LOG.debug("Quota after normalization for %s is: %s:" % + (p, stats[backing_name]['quota_pt'])) + + if total > 100: + # Supplementary protection, we really have to avoid going above + # 100%. Note that real_100pt is less than 100% but we still got + # more than 100! + LOG.warn("Total sum of quotas should not go above 100% " + "but is: %s, recalculating in next cycle" % total) + return + LOG.debug("Total sum of quotas is %s pt" % total) + + # Get current target_max_bytes. We cache it to reduce requests + # to ceph-rest-api. We are the ones changing it, so not an issue. + for p in caching_pools: + if p not in self.target_max_bytes: + try: + value = ceph.osd_get_pool_param(self.service.ceph_api, p, + constants.TARGET_MAX_BYTES) + except exception.CephPoolGetParamFailure as e: + LOG.warn(e) + return + self.target_max_bytes[p] = value + LOG.debug("Existing target_max_bytes got from " + "Ceph: %s" % self.target_max_bytes) + + # Set TARGET_MAX_BYTES + LOG.debug("storage_tier_size: %s " + "cache_tier_size: %s" % (storage_tier_size, + cache_tier_size)) + for p in caching_pools: + backing_name = p.replace('-cache', '') + s = stats[backing_name] + target_max_bytes = math.floor(s['quota_pt'] * cache_tier_size / + 100.0) + target_max_bytes = int(target_max_bytes) + LOG.debug("New Target max bytes of pool: %s is: %s B" % ( + p, target_max_bytes)) + + # Set the new target_max_bytes only if it changed + if self.target_max_bytes.get(p) == target_max_bytes: + LOG.debug("Target max bytes of pool: %s " + "is already updated" % p) + continue + try: + ceph.osd_set_pool_param(self.service.ceph_api, p, + constants.TARGET_MAX_BYTES, + target_max_bytes) + self.target_max_bytes[p] = target_max_bytes + except exception.CephPoolSetParamFailure as e: + LOG.warn(e) + continue + return diff --git a/ceph/ceph-manager/ceph-manager/ceph_manager/ceph.py b/ceph/ceph-manager/ceph-manager/ceph_manager/ceph.py new file mode 100644 index 00000000..dff3c8ab --- /dev/null +++ b/ceph/ceph-manager/ceph-manager/ceph_manager/ceph.py @@ -0,0 +1,164 @@ +# +# Copyright (c) 2016-2018 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +import exception +from i18n import _LI +# noinspection PyUnresolvedReferences +from oslo_log import log as logging + + +LOG = logging.getLogger(__name__) + + +def osd_pool_set_quota(ceph_api, pool_name, max_bytes=0, max_objects=0): + """Set the quota for an OSD pool_name + Setting max_bytes or max_objects to 0 will disable that quota param + :param pool_name: OSD pool_name + :param max_bytes: maximum bytes for OSD pool_name + :param max_objects: maximum objects for OSD pool_name + """ + + # Update quota if needed + prev_quota = osd_pool_get_quota(ceph_api, pool_name) + if prev_quota["max_bytes"] != max_bytes: + resp, b = ceph_api.osd_set_pool_quota(pool_name, 'max_bytes', + max_bytes, body='json') + if resp.ok: + LOG.info(_LI("Set OSD pool_name quota: " + "pool_name={}, max_bytes={}").format( + pool_name, max_bytes)) + else: + e = exception.CephPoolSetQuotaFailure( + pool=pool_name, name='max_bytes', + value=max_bytes, reason=resp.reason) + LOG.error(e) + raise e + if prev_quota["max_objects"] != max_objects: + resp, b = ceph_api.osd_set_pool_quota(pool_name, 'max_objects', + max_objects, + body='json') + if resp.ok: + LOG.info(_LI("Set OSD pool_name quota: " + "pool_name={}, max_objects={}").format( + pool_name, max_objects)) + else: + e = exception.CephPoolSetQuotaFailure( + pool=pool_name, name='max_objects', + value=max_objects, reason=resp.reason) + LOG.error(e) + raise e + + +def osd_pool_get_quota(ceph_api, pool_name): + resp, quota = ceph_api.osd_get_pool_quota(pool_name, body='json') + if not resp.ok: + e = exception.CephPoolGetQuotaFailure( + pool=pool_name, reason=resp.reason) + LOG.error(e) + raise e + else: + return {"max_objects": quota["output"]["quota_max_objects"], + "max_bytes": quota["output"]["quota_max_bytes"]} + + +def osd_pool_exists(ceph_api, pool_name): + response, body = ceph_api.osd_pool_get( + pool_name, "pg_num", body='json') + if response.ok: + return True + return False + + +def osd_pool_create(ceph_api, pool_name, pg_num, pgp_num): + if pool_name.endswith("-cache"): + # ruleset 1: is the ruleset for the cache tier + # Name: cache_tier_ruleset + ruleset = 1 + else: + # ruleset 0: is the default ruleset if no crushmap is loaded or + # the ruleset for the backing tier if loaded: + # Name: storage_tier_ruleset + ruleset = 0 + response, body = ceph_api.osd_pool_create( + pool_name, pg_num, pgp_num, pool_type="replicated", + ruleset=ruleset, body='json') + if response.ok: + LOG.info(_LI("Created OSD pool: " + "pool_name={}, pg_num={}, pgp_num={}, " + "pool_type=replicated, ruleset={}").format( + pool_name, pg_num, pgp_num, ruleset)) + else: + e = exception.CephPoolCreateFailure( + name=pool_name, reason=response.reason) + LOG.error(e) + raise e + + # Explicitly assign the ruleset to the pool on creation since it is + # ignored in the create call + response, body = ceph_api.osd_set_pool_param( + pool_name, "crush_ruleset", ruleset, body='json') + if response.ok: + LOG.info(_LI("Assigned crush ruleset to OS pool: " + "pool_name={}, ruleset={}").format( + pool_name, ruleset)) + else: + e = exception.CephPoolRulesetFailure( + name=pool_name, reason=response.reason) + LOG.error(e) + ceph_api.osd_pool_delete( + pool_name, pool_name, + sure='--yes-i-really-really-mean-it', + body='json') + raise e + + +def osd_pool_delete(ceph_api, pool_name): + """Delete an osd pool + :param pool_name: pool name + """ + response, body = ceph_api.osd_pool_delete( + pool_name, pool_name, + sure='--yes-i-really-really-mean-it', + body='json') + if response.ok: + LOG.info(_LI("Deleted OSD pool {}").format(pool_name)) + else: + e = exception.CephPoolDeleteFailure( + name=pool_name, reason=response.reason) + LOG.warn(e) + raise e + + +def osd_set_pool_param(ceph_api, pool_name, param, value): + response, body = ceph_api.osd_set_pool_param( + pool_name, param, value, + force=None, body='json') + if response.ok: + LOG.info('OSD set pool param: ' + 'pool={}, name={}, value={}'.format( + pool_name, param, value)) + else: + raise exception.CephPoolSetParamFailure( + pool_name=pool_name, + param=param, + value=str(value), + reason=response.reason) + return response, body + + +def osd_get_pool_param(ceph_api, pool_name, param): + response, body = ceph_api.osd_get_pool_param( + pool_name, param, body='json') + if response.ok: + LOG.debug('OSD get pool param: ' + 'pool={}, name={}, value={}'.format( + pool_name, param, body['output'][param])) + else: + raise exception.CephPoolGetParamFailure( + pool_name=pool_name, + param=param, + reason=response.reason) + return body['output'][param] diff --git a/ceph/ceph-manager/ceph-manager/ceph_manager/constants.py b/ceph/ceph-manager/ceph-manager/ceph_manager/constants.py new file mode 100644 index 00000000..5b297743 --- /dev/null +++ b/ceph/ceph-manager/ceph-manager/ceph_manager/constants.py @@ -0,0 +1,107 @@ +# +# Copyright (c) 2016-2018 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +from i18n import _ +# noinspection PyUnresolvedReferences +from sysinv.common import constants as sysinv_constants + +CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL = \ + sysinv_constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL +CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER = \ + sysinv_constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER +CEPH_POOLS = sysinv_constants.BACKING_POOLS +CEPH_REPLICATION_FACTOR = sysinv_constants.CEPH_REPLICATION_FACTOR_DEFAULT +SERVICE_PARAM_CEPH_CACHE_HIT_SET_TYPE_BLOOM = \ + sysinv_constants.SERVICE_PARAM_CEPH_CACHE_HIT_SET_TYPE_BLOOM +CACHE_TIERING_DEFAULTS = sysinv_constants.CACHE_TIERING_DEFAULTS +TARGET_MAX_BYTES = \ + sysinv_constants.SERVICE_PARAM_CEPH_CACHE_TIER_TARGET_MAX_BYTES + +# Cache tiering section shortener +CACHE_TIERING = \ + sysinv_constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER +CACHE_TIERING_DESIRED = \ + sysinv_constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_DESIRED +CACHE_TIERING_APPLIED = \ + sysinv_constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_APPLIED +CACHE_TIERING_SECTIONS = \ + [CACHE_TIERING, CACHE_TIERING_DESIRED, CACHE_TIERING_APPLIED] + +# Cache flush parameters +CACHE_FLUSH_OBJECTS_THRESHOLD = 1000 +CACHE_FLUSH_MIN_WAIT_OBJ_COUNT_DECREASE_SEC = 1 +CACHE_FLUSH_MAX_WAIT_OBJ_COUNT_DECREASE_SEC = 128 + +CACHE_TIERING_MIN_QUOTA = 5 + +FM_ALARM_REASON_MAX_SIZE = 256 + +# TODO this will later change based on parsed health +# clock skew is vm malfunction, mon or osd is equipment mal +ALARM_CAUSE = 'equipment-malfunction' +ALARM_TYPE = 'equipment' + +# Ceph health check interval (in seconds) +CEPH_HEALTH_CHECK_INTERVAL = 60 + +# Ceph health statuses +CEPH_HEALTH_OK = 'HEALTH_OK' +CEPH_HEALTH_WARN = 'HEALTH_WARN' +CEPH_HEALTH_ERR = 'HEALTH_ERR' +CEPH_HEALTH_DOWN = 'CEPH_DOWN' + +# Statuses not reported by Ceph +CEPH_STATUS_CUSTOM = [CEPH_HEALTH_DOWN] + +SEVERITY = {CEPH_HEALTH_DOWN: 'critical', + CEPH_HEALTH_ERR: 'critical', + CEPH_HEALTH_WARN: 'warning'} + +SERVICE_AFFECTING = {CEPH_HEALTH_DOWN: True, + CEPH_HEALTH_ERR: True, + CEPH_HEALTH_WARN: False} + +# TODO this will later change based on parsed health +ALARM_REASON_NO_OSD = _('no OSDs') +ALARM_REASON_OSDS_DOWN = _('OSDs are down') +ALARM_REASON_OSDS_OUT = _('OSDs are out') +ALARM_REASON_OSDS_DOWN_OUT = _('OSDs are down/out') +ALARM_REASON_PEER_HOST_DOWN = _('peer host down') + +REPAIR_ACTION_MAJOR_CRITICAL_ALARM = _( + 'Ensure storage hosts from replication group are unlocked and available.' + 'Check if OSDs of each storage host are up and running.' + 'If problem persists, contact next level of support.') +REPAIR_ACTION = _('If problem persists, contact next level of support.') + +SYSINV_CONDUCTOR_TOPIC = 'sysinv.conductor_manager' +CEPH_MANAGER_TOPIC = 'sysinv.ceph_manager' +SYSINV_CONFIG_FILE = '/etc/sysinv/sysinv.conf' + +# Titanium Cloud version strings +TITANIUM_SERVER_VERSION_16_10 = '16.10' + +CEPH_HEALTH_WARN_REQUIRE_JEWEL_OSDS_NOT_SET = ( + "all OSDs are running jewel or later but the " + "'require_jewel_osds' osdmap flag is not set") + +UPGRADE_COMPLETED = \ + sysinv_constants.UPGRADE_COMPLETED +UPGRADE_ABORTING = \ + sysinv_constants.UPGRADE_ABORTING +UPGRADE_ABORT_COMPLETING = \ + sysinv_constants.UPGRADE_ABORT_COMPLETING +UPGRADE_ABORTING_ROLLBACK = \ + sysinv_constants.UPGRADE_ABORTING_ROLLBACK + +CEPH_FLAG_REQUIRE_JEWEL_OSDS = 'require_jewel_osds' + +# Tiers +CEPH_CRUSH_TIER_SUFFIX = sysinv_constants.CEPH_CRUSH_TIER_SUFFIX +SB_TIER_TYPE_CEPH = sysinv_constants.SB_TIER_TYPE_CEPH +SB_TIER_SUPPORTED = sysinv_constants.SB_TIER_SUPPORTED +SB_TIER_DEFAULT_NAMES = sysinv_constants.SB_TIER_DEFAULT_NAMES +SB_TIER_CEPH_POOLS = sysinv_constants.SB_TIER_CEPH_POOLS diff --git a/ceph/ceph-manager/ceph-manager/ceph_manager/exception.py b/ceph/ceph-manager/ceph-manager/ceph_manager/exception.py new file mode 100644 index 00000000..c2d81b8b --- /dev/null +++ b/ceph/ceph-manager/ceph-manager/ceph_manager/exception.py @@ -0,0 +1,130 @@ +# +# Copyright (c) 2016-2017 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +# noinspection PyUnresolvedReferences +from i18n import _, _LW +# noinspection PyUnresolvedReferences +from oslo_log import log as logging + + +LOG = logging.getLogger(__name__) + + +class CephManagerException(Exception): + message = _("An unknown exception occurred.") + + def __init__(self, message=None, **kwargs): + self.kwargs = kwargs + if not message: + try: + message = self.message % kwargs + except TypeError: + LOG.warn(_LW('Exception in string format operation')) + for name, value in kwargs.iteritems(): + LOG.error("%s: %s" % (name, value)) + # at least get the core message out if something happened + message = self.message + super(CephManagerException, self).__init__(message) + + +class CephPoolSetQuotaFailure(CephManagerException): + message = _("Error seting the OSD pool " + "quota %(name)s for %(pool)s to %(value)s") \ + + ": %(reason)s" + + +class CephPoolGetQuotaFailure(CephManagerException): + message = _("Error geting the OSD pool quota for %(pool)s") \ + + ": %(reason)s" + + +class CephPoolCreateFailure(CephManagerException): + message = _("Creating OSD pool %(name)s failed: %(reason)s") + + +class CephPoolDeleteFailure(CephManagerException): + message = _("Deleting OSD pool %(name)s failed: %(reason)s") + + +class CephPoolRulesetFailure(CephManagerException): + message = _("Assigning crush ruleset to OSD " + "pool %(name)s failed: %(reason)s") + + +class CephPoolAddTierFailure(CephManagerException): + message = _("Failed to add OSD tier: " + "backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, " + "response=%(response_status_code)s:%(response_reason)s, " + "status=%(status)s, output=%(output)s") + + +class CephPoolRemoveTierFailure(CephManagerException): + message = _("Failed to remove tier: " + "backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, " + "response=%(response_status_code)s:%(response_reason)s, " + "status=%(status)s, output=%(output)s") + + +class CephCacheSetModeFailure(CephManagerException): + message = _("Failed to set OSD tier cache mode: " + "cache_pool=%(cache_pool)s, mode=%(mode)s, " + "response=%(response_status_code)s:%(response_reason)s, " + "status=%(status)s, output=%(output)s") + + +class CephPoolSetParamFailure(CephManagerException): + message = _("Cannot set Ceph OSD pool parameter: " + "pool_name=%(pool_name)s, param=%(param)s, value=%(value)s. " + "Reason: %(reason)s") + + +class CephPoolGetParamFailure(CephManagerException): + message = _("Cannot get Ceph OSD pool parameter: " + "pool_name=%(pool_name)s, param=%(param)s. " + "Reason: %(reason)s") + + +class CephCacheCreateOverlayFailure(CephManagerException): + message = _("Failed to create overlay: " + "backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, " + "response=%(response_status_code)s:%(response_reason)s, " + "status=%(status)s, output=%(output)s") + + +class CephCacheDeleteOverlayFailure(CephManagerException): + message = _("Failed to delete overlay: " + "backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, " + "response=%(response_status_code)s:%(response_reason)s, " + "status=%(status)s, output=%(output)s") + + +class CephCacheFlushFailure(CephManagerException): + message = _("Failed to flush cache pool: " + "cache_pool=%(cache_pool)s, " + "return_code=%(return_code)s, " + "cmd=%(cmd)s, output=%(output)s") + + +class CephCacheEnableFailure(CephManagerException): + message = _("Cannot enable Ceph cache tier. " + "Reason: cache tiering operation in progress.") + + +class CephCacheDisableFailure(CephManagerException): + message = _("Cannot disable Ceph cache tier. " + "Reason: cache tiering operation in progress.") + + +class CephSetKeyFailure(CephManagerException): + message = _("Error setting the Ceph flag " + "'%(flag)s' %(extra)s: " + "response=%(response_status_code)s:%(response_reason)s, " + "status=%(status)s, output=%(output)s") + + +class CephApiFailure(CephManagerException): + message = _("API failure: " + "call=%(call)s, reason=%(reason)s") diff --git a/ceph/ceph-manager/ceph-manager/ceph_manager/i18n.py b/ceph/ceph-manager/ceph-manager/ceph_manager/i18n.py new file mode 100644 index 00000000..67977cea --- /dev/null +++ b/ceph/ceph-manager/ceph-manager/ceph_manager/i18n.py @@ -0,0 +1,15 @@ +# +# Copyright (c) 2016 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# +import oslo_i18n + +DOMAIN = 'ceph-manager' + +_translators = oslo_i18n.TranslatorFactory(domain=DOMAIN) +_ = _translators.primary + +_LI = _translators.log_info +_LW = _translators.log_warning +_LE = _translators.log_error diff --git a/ceph/ceph-manager/ceph-manager/ceph_manager/monitor.py b/ceph/ceph-manager/ceph-manager/ceph_manager/monitor.py new file mode 100644 index 00000000..941e5fc0 --- /dev/null +++ b/ceph/ceph-manager/ceph-manager/ceph_manager/monitor.py @@ -0,0 +1,893 @@ +# +# Copyright (c) 2013-2018 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +import time + +# noinspection PyUnresolvedReferences +from fm_api import fm_api +# noinspection PyUnresolvedReferences +from fm_api import constants as fm_constants +# noinspection PyUnresolvedReferences +from oslo_log import log as logging + +from sysinv.conductor.cache_tiering_service_config import ServiceConfig + +# noinspection PyProtectedMember +from i18n import _, _LI, _LW, _LE + +import constants +import exception + +LOG = logging.getLogger(__name__) + + +# When upgrading from 16.10 to 17.x Ceph goes from Hammer release +# to Jewel release. After all storage nodes are upgraded to 17.x +# the cluster is in HEALTH_WARN until administrator explicitly +# enables require_jewel_osds flag - which signals Ceph that it +# can safely transition from Hammer to Jewel +# +# This class is needed only when upgrading from 16.10 to 17.x +# TODO: remove it after 1st 17.x release +# +class HandleUpgradesMixin(object): + + def __init__(self, service): + self.service = service + self.surpress_require_jewel_osds_warning = False + + def setup(self, config): + self._set_upgrade(self.service.retry_get_software_upgrade_status()) + + def _set_upgrade(self, upgrade): + state = upgrade.get('state') + from_version = upgrade.get('from_version') + if (state + and state != constants.UPGRADE_COMPLETED + and from_version == constants.TITANIUM_SERVER_VERSION_16_10): + LOG.info(_LI("Surpress require_jewel_osds health warning")) + self.surpress_require_jewel_osds_warning = True + + def set_flag_require_jewel_osds(self): + try: + response, body = self.service.ceph_api.osd_set_key( + constants.CEPH_FLAG_REQUIRE_JEWEL_OSDS, + body='json') + LOG.info(_LI("Set require_jewel_osds flag")) + except IOError as e: + raise exception.CephApiFailure( + call="osd_set_key", + reason=e.message) + else: + if not response.ok: + raise exception.CephSetKeyFailure( + flag=constants.CEPH_FLAG_REQUIRE_JEWEL_OSDS, + extra=_("needed to complete upgrade to Jewel"), + response_status_code=response.status_code, + response_reason=response.reason, + status=body.get('status'), + output=body.get('output')) + + def filter_health_status(self, health): + health = self.auto_heal(health) + # filter out require_jewel_osds warning + # + if not self.surpress_require_jewel_osds_warning: + return health + if health['health'] != constants.CEPH_HEALTH_WARN: + return health + if (constants.CEPH_HEALTH_WARN_REQUIRE_JEWEL_OSDS_NOT_SET + not in health['detail']): + return health + return self._remove_require_jewel_osds_warning(health) + + def _remove_require_jewel_osds_warning(self, health): + reasons_list = [] + for reason in health['detail'].split(';'): + reason = reason.strip() + if len(reason) == 0: + continue + if constants.CEPH_HEALTH_WARN_REQUIRE_JEWEL_OSDS_NOT_SET in reason: + continue + reasons_list.append(reason) + if len(reasons_list) == 0: + health = { + 'health': constants.CEPH_HEALTH_OK, + 'detail': ''} + else: + health['detail'] = '; '.join(reasons_list) + return health + + def auto_heal(self, health): + if (health['health'] == constants.CEPH_HEALTH_WARN + and (constants.CEPH_HEALTH_WARN_REQUIRE_JEWEL_OSDS_NOT_SET + in health['detail'])): + try: + upgrade = self.service.get_software_upgrade_status() + except Exception as ex: + LOG.warn(_LW( + "Getting software upgrade status failed " + "with: %s. Skip auto-heal attempt " + "(will retry on next ceph status poll).") % str(ex)) + return + state = upgrade.get('state') + # surpress require_jewel_osds in case upgrade is + # in progress but not completed or aborting + if (not self.surpress_require_jewel_osds_warning + and (upgrade.get('from_version') + == constants.TITANIUM_SERVER_VERSION_16_10) + and state not in [ + None, + constants.UPGRADE_COMPLETED, + constants.UPGRADE_ABORTING, + constants.UPGRADE_ABORT_COMPLETING, + constants.UPGRADE_ABORTING_ROLLBACK]): + LOG.info(_LI("Surpress require_jewel_osds health warning")) + self.surpress_require_jewel_osds_warning = True + # set require_jewel_osds in case upgrade is + # not in progress or completed + if (state in [None, constants.UPGRADE_COMPLETED]): + LOG.warn(_LW( + "No upgrade in progress or update completed " + "and require_jewel_osds health warning raised. " + "Set require_jewel_osds flag.")) + self.set_flag_require_jewel_osds() + health = self._remove_require_jewel_osds_warning(health) + LOG.info(_LI("Unsurpress require_jewel_osds health warning")) + self.surpress_require_jewel_osds_warning = False + # unsurpress require_jewel_osds in case upgrade + # is aborting + if (self.surpress_require_jewel_osds_warning + and state in [ + constants.UPGRADE_ABORTING, + constants.UPGRADE_ABORT_COMPLETING, + constants.UPGRADE_ABORTING_ROLLBACK]): + LOG.info(_LI("Unsurpress require_jewel_osds health warning")) + self.surpress_require_jewel_osds_warning = False + return health + + +class Monitor(HandleUpgradesMixin): + + def __init__(self, service): + self.service = service + self.current_ceph_health = "" + self.cache_enabled = False + self.tiers_size = {} + self.known_object_pool_name = None + self.primary_tier_name = constants.SB_TIER_DEFAULT_NAMES[ + constants.SB_TIER_TYPE_CEPH] + constants.CEPH_CRUSH_TIER_SUFFIX + self.cluster_is_up = False + super(Monitor, self).__init__(service) + + def setup(self, config): + self.set_caching_tier_config(config) + super(Monitor, self).setup(config) + + def set_caching_tier_config(self, config): + conf = ServiceConfig().from_dict( + config.get(constants.CACHE_TIERING_APPLIED)) + if conf: + self.cache_enabled = conf.cache_enabled + + def monitor_check_cache_tier(self, enable_flag): + LOG.info(_LI("monitor_check_cache_tier: " + "enable_flag={}".format(enable_flag))) + self.cache_enabled = enable_flag + + def run(self): + # Wait until Ceph cluster is up and we can get the fsid + while True: + self.ceph_get_fsid() + if self.service.entity_instance_id: + break + time.sleep(constants.CEPH_HEALTH_CHECK_INTERVAL) + + # Start monitoring ceph status + while True: + self.ceph_poll_status() + self.ceph_poll_quotas() + time.sleep(constants.CEPH_HEALTH_CHECK_INTERVAL) + + def ceph_get_fsid(self): + # Check whether an alarm has already been raised + self._get_current_alarms() + if self.current_health_alarm: + LOG.info(_LI("Current alarm: %s") % + str(self.current_health_alarm.__dict__)) + + fsid = self._get_fsid() + if not fsid: + # Raise alarm - it will not have an entity_instance_id + self._report_fault({'health': constants.CEPH_HEALTH_DOWN, + 'detail': 'Ceph cluster is down.'}, + fm_constants.FM_ALARM_ID_STORAGE_CEPH) + else: + # Clear alarm with no entity_instance_id + self._clear_fault(fm_constants.FM_ALARM_ID_STORAGE_CEPH) + self.service.entity_instance_id = 'cluster=%s' % fsid + + def ceph_poll_status(self): + # get previous data every time in case: + # * daemon restarted + # * alarm was cleared manually but stored as raised in daemon + self._get_current_alarms() + if self.current_health_alarm: + LOG.info(_LI("Current alarm: %s") % + str(self.current_health_alarm.__dict__)) + + # get ceph health + health = self._get_health() + LOG.info(_LI("Current Ceph health: " + "%(health)s detail: %(detail)s") % health) + + health = self.filter_health_status(health) + if health['health'] != constants.CEPH_HEALTH_OK: + self._report_fault(health, fm_constants.FM_ALARM_ID_STORAGE_CEPH) + self._report_alarm_osds_health() + else: + self._clear_fault(fm_constants.FM_ALARM_ID_STORAGE_CEPH) + self.clear_all_major_critical() + + def filter_health_status(self, health): + return super(Monitor, self).filter_health_status(health) + + def ceph_poll_quotas(self): + self._get_current_alarms() + if self.current_quota_alarms: + LOG.info(_LI("Current quota alarms %s") % + self.current_quota_alarms) + + # Get current current size of each tier + previous_tiers_size = self.tiers_size + self.tiers_size = self._get_tiers_size() + + # Make sure any removed tiers have the alarms cleared + for t in (set(previous_tiers_size)-set(self.tiers_size)): + self._clear_fault(fm_constants.FM_ALARM_ID_STORAGE_CEPH_FREE_SPACE, + "{0}.tier={1}".format( + self.service.entity_instance_id, + t[:-len(constants.CEPH_CRUSH_TIER_SUFFIX)])) + + # Check the quotas on each tier + for tier in self.tiers_size: + # TODO(rchurch): For R6 remove the tier from the default crushmap + # and remove this check. No longer supporting this tier in R5 + if tier == 'cache-tier': + continue + + # Extract the tier name from the crush equivalent + tier_name = tier[:-len(constants.CEPH_CRUSH_TIER_SUFFIX)] + + if self.tiers_size[tier] == 0: + LOG.info(_LI("'%s' tier cluster size not yet available") + % tier_name) + continue + + pools_quota_sum = 0 + if tier == self.primary_tier_name: + for pool in constants.CEPH_POOLS: + if (pool['pool_name'] == + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or + pool['pool_name'] == + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER): + object_pool_name = self._get_object_pool_name() + if object_pool_name is None: + LOG.error("Rados gateway object data pool does " + "not exist.") + else: + pools_quota_sum += \ + self._get_osd_pool_quota(object_pool_name) + else: + pools_quota_sum += self._get_osd_pool_quota( + pool['pool_name']) + else: + for pool in constants.SB_TIER_CEPH_POOLS: + pool_name = "{0}-{1}".format(pool['pool_name'], tier_name) + pools_quota_sum += self._get_osd_pool_quota(pool_name) + + # Currently, there is only one pool on the addtional tier(s), + # therefore allow a quota of 0 + if (pools_quota_sum != self.tiers_size[tier] and + pools_quota_sum != 0): + self._report_fault( + {'tier_name': tier_name, + 'tier_eid': "{0}.tier={1}".format( + self.service.entity_instance_id, + tier_name)}, + fm_constants.FM_ALARM_ID_STORAGE_CEPH_FREE_SPACE) + else: + self._clear_fault( + fm_constants.FM_ALARM_ID_STORAGE_CEPH_FREE_SPACE, + "{0}.tier={1}".format(self.service.entity_instance_id, + tier_name)) + + # CEPH HELPERS + + def _get_fsid(self): + try: + response, fsid = self.service.ceph_api.fsid( + body='text', timeout=30) + except IOError as e: + LOG.warning(_LW("ceph_api.fsid failed: %s") % str(e.message)) + self.cluster_is_up = False + return None + + if not response.ok: + LOG.warning(_LW("Get fsid failed: %s") % response.reason) + self.cluster_is_up = False + return None + + self.cluster_is_up = True + return fsid.strip() + + def _get_health(self): + try: + # we use text since it has all info + response, body = self.service.ceph_api.health( + body='text', timeout=30) + except IOError as e: + LOG.warning(_LW("ceph_api.health failed: %s") % str(e.message)) + self.cluster_is_up = False + return {'health': constants.CEPH_HEALTH_DOWN, + 'detail': 'Ceph cluster is down.'} + + if not response.ok: + LOG.warning(_LW("CEPH health check failed: %s") % response.reason) + health_info = [constants.CEPH_HEALTH_DOWN, response.reason] + self.cluster_is_up = False + else: + health_info = body.split(' ', 1) + self.cluster_is_up = True + + health = health_info[0] + + if len(health_info) > 1: + detail = health_info[1] + else: + detail = health_info[0] + + return {'health': health.strip(), + 'detail': detail.strip()} + + def _get_object_pool_name(self): + if self.known_object_pool_name is None: + response, body = self.service.ceph_api.osd_pool_get( + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL, + "pg_num", + body='json') + + if response.ok: + self.known_object_pool_name = \ + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL + return self.known_object_pool_name + + response, body = self.service.ceph_api.osd_pool_get( + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER, + "pg_num", + body='json') + + if response.ok: + self.known_object_pool_name = \ + constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER + return self.known_object_pool_name + + return self.known_object_pool_name + + def _get_osd_pool_quota(self, pool_name): + try: + resp, quota = self.service.ceph_api.osd_get_pool_quota( + pool_name, body='json') + except IOError: + return 0 + + if not resp.ok: + LOG.error(_LE("Getting the quota for " + "%(name)s pool failed:%(reason)s)") % + {"name": pool_name, "reason": resp.reason}) + return 0 + else: + try: + quota_gib = int(quota["output"]["quota_max_bytes"])/(1024**3) + return quota_gib + except IOError: + return 0 + + # we have two root nodes 'cache-tier' and 'storage-tier' + # to calculate the space that is used by the pools, we must only + # use 'storage-tier' + # this function determines if a certain node is under a certain + # tree + def host_is_in_root(self, search_tree, node, root_name): + if node['type'] == 'root': + if node['name'] == root_name: + return True + else: + return False + return self.host_is_in_root(search_tree, + search_tree[node['parent']], + root_name) + + # The information received from ceph is not properly + # structured for efficient parsing and searching, so + # it must be processed and transformed into a more + # structured form. + # + # Input received from ceph is an array of nodes with the + # following structure: + # [{'id':, 'children':, ....}, + # ...] + # + # We process this array and transform it into a dictionary + # (for efficient access) The transformed "search tree" is a + # dictionary with the following structure: + # { : {'children':} + def _get_tiers_size(self): + try: + resp, body = self.service.ceph_api.osd_df( + body='json', + output_method='tree') + except IOError: + return 0 + if not resp.ok: + LOG.error(_LE("Getting the cluster usage " + "information failed: %(reason)s - " + "%(body)s") % {"reason": resp.reason, + "body": body}) + return {} + + # A node is a crushmap element: root, chassis, host, osd. Create a + # dictionary for the nodes with the key as the id used for efficient + # searching through nodes. + # + # For example: storage-0's node has one child node => OSD 0 + # { + # "id": -4, + # "name": "storage-0", + # "type": "host", + # "type_id": 1, + # "reweight": -1.000000, + # "kb": 51354096, + # "kb_used": 1510348, + # "kb_avail": 49843748, + # "utilization": 2.941047, + # "var": 1.480470, + # "pgs": 0, + # "children": [ + # 0 + # ] + # }, + search_tree = {} + for node in body['output']['nodes']: + search_tree[node['id']] = node + + # Extract the tiers as we will return a dict for the size of each tier + tiers = {k: v for k, v in search_tree.items() if v['type'] == 'root'} + + # For each tier, traverse the heirarchy from the root->chassis->host. + # Sum the host sizes to determine the overall size of the tier + tier_sizes = {} + for tier in tiers.values(): + tier_size = 0 + for chassis_id in tier['children']: + chassis_size = 0 + chassis = search_tree[chassis_id] + for host_id in chassis['children']: + host = search_tree[host_id] + if (chassis_size == 0 or + chassis_size > host['kb']): + chassis_size = host['kb'] + tier_size += chassis_size/(1024 ** 2) + tier_sizes[tier['name']] = tier_size + + return tier_sizes + + # ALARM HELPERS + + @staticmethod + def _check_storage_group(osd_tree, group_id, + hosts, osds, fn_report_alarm): + reasons = set() + degraded_hosts = set() + severity = fm_constants.FM_ALARM_SEVERITY_CRITICAL + for host_id in hosts: + if len(osds[host_id]) == 0: + reasons.add(constants.ALARM_REASON_NO_OSD) + degraded_hosts.add(host_id) + else: + for osd_id in osds[host_id]: + if osd_tree[osd_id]['status'] == 'up': + if osd_tree[osd_id]['reweight'] == 0.0: + reasons.add(constants.ALARM_REASON_OSDS_OUT) + degraded_hosts.add(host_id) + else: + severity = fm_constants.FM_ALARM_SEVERITY_MAJOR + elif osd_tree[osd_id]['status'] == 'down': + reasons.add(constants.ALARM_REASON_OSDS_DOWN) + degraded_hosts.add(host_id) + if constants.ALARM_REASON_OSDS_OUT in reasons \ + and constants.ALARM_REASON_OSDS_DOWN in reasons: + reasons.add(constants.ALARM_REASON_OSDS_DOWN_OUT) + reasons.remove(constants.ALARM_REASON_OSDS_OUT) + if constants.ALARM_REASON_OSDS_DOWN in reasons \ + and constants.ALARM_REASON_OSDS_DOWN_OUT in reasons: + reasons.remove(constants.ALARM_REASON_OSDS_DOWN) + reason = "/".join(list(reasons)) + if severity == fm_constants.FM_ALARM_SEVERITY_CRITICAL: + reason = "{} {}: {}".format( + fm_constants.ALARM_CRITICAL_REPLICATION, + osd_tree[group_id]['name'], + reason) + elif severity == fm_constants.FM_ALARM_SEVERITY_MAJOR: + reason = "{} {}: {}".format( + fm_constants.ALARM_MAJOR_REPLICATION, + osd_tree[group_id]['name'], + reason) + if len(degraded_hosts) == 0: + if len(hosts) < 2: + fn_report_alarm( + osd_tree[group_id]['name'], + "{} {}: {}".format( + fm_constants.ALARM_MAJOR_REPLICATION, + osd_tree[group_id]['name'], + constants.ALARM_REASON_PEER_HOST_DOWN), + fm_constants.FM_ALARM_SEVERITY_MAJOR) + elif len(degraded_hosts) == 1: + fn_report_alarm( + "{}.host={}".format( + osd_tree[group_id]['name'], + osd_tree[list(degraded_hosts)[0]]['name']), + reason, severity) + else: + fn_report_alarm( + osd_tree[group_id]['name'], + reason, severity) + + def _check_storage_tier(self, osd_tree, tier_name, fn_report_alarm): + for tier_id in osd_tree: + if osd_tree[tier_id]['type'] != 'root': + continue + if osd_tree[tier_id]['name'] != tier_name: + continue + for group_id in osd_tree[tier_id]['children']: + if osd_tree[group_id]['type'] != 'chassis': + continue + if not osd_tree[group_id]['name'].startswith('group-'): + continue + hosts = [] + osds = {} + for host_id in osd_tree[group_id]['children']: + if osd_tree[host_id]['type'] != 'host': + continue + hosts.append(host_id) + osds[host_id] = [] + for osd_id in osd_tree[host_id]['children']: + if osd_tree[osd_id]['type'] == 'osd': + osds[host_id].append(osd_id) + self._check_storage_group(osd_tree, group_id, hosts, + osds, fn_report_alarm) + break + + def _current_health_alarm_equals(self, reason, severity): + if not self.current_health_alarm: + return False + if getattr(self.current_health_alarm, 'severity', None) != severity: + return False + if getattr(self.current_health_alarm, 'reason_text', None) != reason: + return False + return True + + def _report_alarm_osds_health(self): + response, osd_tree = self.service.ceph_api.osd_tree(body='json') + if not response.ok: + LOG.error(_LE("Failed to retrieve Ceph OSD tree: " + "status_code: %(status_code)s, reason: %(reason)s") % + {"status_code": response.status_code, + "reason": response.reason}) + return + osd_tree = dict([(n['id'], n) for n in osd_tree['output']['nodes']]) + alarms = [] + + self._check_storage_tier(osd_tree, "storage-tier", + lambda *args: alarms.append(args)) + if self.cache_enabled: + self._check_storage_tier(osd_tree, "cache-tier", + lambda *args: alarms.append(args)) + + old_alarms = {} + for alarm_id in [ + fm_constants.FM_ALARM_ID_STORAGE_CEPH_MAJOR, + fm_constants.FM_ALARM_ID_STORAGE_CEPH_CRITICAL]: + alarm_list = self.service.fm_api.get_faults_by_id(alarm_id) + if not alarm_list: + continue + for alarm in alarm_list: + if alarm.entity_instance_id not in old_alarms: + old_alarms[alarm.entity_instance_id] = [] + old_alarms[alarm.entity_instance_id].append( + (alarm.alarm_id, alarm.reason_text)) + + for peer_group, reason, severity in alarms: + if self._current_health_alarm_equals(reason, severity): + continue + alarm_critical_major = fm_constants.FM_ALARM_ID_STORAGE_CEPH_MAJOR + if severity == fm_constants.FM_ALARM_SEVERITY_CRITICAL: + alarm_critical_major = ( + fm_constants.FM_ALARM_ID_STORAGE_CEPH_CRITICAL) + entity_instance_id = ( + self.service.entity_instance_id + '.peergroup=' + peer_group) + alarm_already_exists = False + if entity_instance_id in old_alarms: + for alarm_id, old_reason in old_alarms[entity_instance_id]: + if (reason == old_reason and + alarm_id == alarm_critical_major): + # if the alarm is exactly the same, we don't need + # to recreate it + old_alarms[entity_instance_id].remove( + (alarm_id, old_reason)) + alarm_already_exists = True + elif (alarm_id == alarm_critical_major): + # if we change just the reason, then we just remove the + # alarm from the list so we don't remove it at the + # end of the function + old_alarms[entity_instance_id].remove( + (alarm_id, old_reason)) + + if (len(old_alarms[entity_instance_id]) == 0): + del old_alarms[entity_instance_id] + + # in case the alarm is exactly the same, we skip the alarm set + if alarm_already_exists is True: + continue + major_repair_action = constants.REPAIR_ACTION_MAJOR_CRITICAL_ALARM + fault = fm_api.Fault( + alarm_id=alarm_critical_major, + alarm_type=fm_constants.FM_ALARM_TYPE_4, + alarm_state=fm_constants.FM_ALARM_STATE_SET, + entity_type_id=fm_constants.FM_ENTITY_TYPE_CLUSTER, + entity_instance_id=entity_instance_id, + severity=severity, + reason_text=reason, + probable_cause=fm_constants.ALARM_PROBABLE_CAUSE_15, + proposed_repair_action=major_repair_action, + service_affecting=constants.SERVICE_AFFECTING['HEALTH_WARN']) + alarm_uuid = self.service.fm_api.set_fault(fault) + if alarm_uuid: + LOG.info(_LI( + "Created storage alarm %(alarm_uuid)s - " + "severity: %(severity)s, reason: %(reason)s, " + "service_affecting: %(service_affecting)s") % { + "alarm_uuid": str(alarm_uuid), + "severity": str(severity), + "reason": reason, + "service_affecting": str( + constants.SERVICE_AFFECTING['HEALTH_WARN'])}) + else: + LOG.error(_LE( + "Failed to create storage alarm - " + "severity: %(severity)s, reason: %(reason)s, " + "service_affecting: %(service_affecting)s") % { + "severity": str(severity), + "reason": reason, + "service_affecting": str( + constants.SERVICE_AFFECTING['HEALTH_WARN'])}) + + for entity_instance_id in old_alarms: + for alarm_id, old_reason in old_alarms[entity_instance_id]: + self.service.fm_api.clear_fault(alarm_id, entity_instance_id) + + @staticmethod + def _parse_reason(health): + """ Parse reason strings received from Ceph """ + if health['health'] in constants.CEPH_STATUS_CUSTOM: + # Don't parse reason messages that we added + return "Storage Alarm Condition: %(health)s. %(detail)s" % health + + reasons_lst = health['detail'].split(';') + + parsed_reasons_text = "" + + # Check if PGs have issues - we can't safely store the entire message + # as it tends to be long + for reason in reasons_lst: + if "pgs" in reason: + parsed_reasons_text += "PGs are degraded/stuck or undersized" + break + + # Extract recovery status + parsed_reasons = [r.strip() for r in reasons_lst if 'recovery' in r] + if parsed_reasons: + parsed_reasons_text += ";" + ";".join(parsed_reasons) + + # We need to keep the most important parts of the messages when storing + # them to fm alarms, therefore text between [] brackets is truncated if + # max size is reached. + + # Add brackets, if needed + if len(parsed_reasons_text): + lbracket = " [" + rbracket = "]" + else: + lbracket = "" + rbracket = "" + + msg = {"head": "Storage Alarm Condition: ", + "tail": ". Please check 'ceph -s' for more details."} + max_size = constants.FM_ALARM_REASON_MAX_SIZE - \ + len(msg["head"]) - len(msg["tail"]) + + return ( + msg['head'] + + (health['health'] + lbracket + parsed_reasons_text)[:max_size-1] + + rbracket + msg['tail']) + + def _report_fault(self, health, alarm_id): + if alarm_id == fm_constants.FM_ALARM_ID_STORAGE_CEPH: + new_severity = constants.SEVERITY[health['health']] + new_reason_text = self._parse_reason(health) + new_service_affecting = \ + constants.SERVICE_AFFECTING[health['health']] + + # Raise or update alarm if necessary + if ((not self.current_health_alarm) or + (self.current_health_alarm.__dict__['severity'] != + new_severity) or + (self.current_health_alarm.__dict__['reason_text'] != + new_reason_text) or + (self.current_health_alarm.__dict__['service_affecting'] != + str(new_service_affecting))): + + fault = fm_api.Fault( + alarm_id=fm_constants.FM_ALARM_ID_STORAGE_CEPH, + alarm_type=fm_constants.FM_ALARM_TYPE_4, + alarm_state=fm_constants.FM_ALARM_STATE_SET, + entity_type_id=fm_constants.FM_ENTITY_TYPE_CLUSTER, + entity_instance_id=self.service.entity_instance_id, + severity=new_severity, + reason_text=new_reason_text, + probable_cause=fm_constants.ALARM_PROBABLE_CAUSE_15, + proposed_repair_action=constants.REPAIR_ACTION, + service_affecting=new_service_affecting) + + alarm_uuid = self.service.fm_api.set_fault(fault) + if alarm_uuid: + LOG.info(_LI( + "Created storage alarm %(alarm_uuid)s - " + "severity: %(severity)s, reason: %(reason)s, " + "service_affecting: %(service_affecting)s") % { + "alarm_uuid": alarm_uuid, + "severity": new_severity, + "reason": new_reason_text, + "service_affecting": new_service_affecting}) + else: + LOG.error(_LE( + "Failed to create storage alarm - " + "severity: %(severity)s, reason: %(reason)s " + "service_affecting: %(service_affecting)s") % { + "severity": new_severity, + "reason": new_reason_text, + "service_affecting": new_service_affecting}) + + # Log detailed reason for later analysis + if (self.current_ceph_health != health['health'] or + self.detailed_health_reason != health['detail']): + LOG.info(_LI("Ceph status changed: %(health)s " + "detailed reason: %(detail)s") % health) + self.current_ceph_health = health['health'] + self.detailed_health_reason = health['detail'] + + elif (alarm_id == fm_constants.FM_ALARM_ID_STORAGE_CEPH_FREE_SPACE and + not health['tier_eid'] in self.current_quota_alarms): + + quota_reason_text = ("Quota/Space mismatch for the %s tier. The " + "sum of Ceph pool quotas does not match the " + "tier size." % health['tier_name']) + fault = fm_api.Fault( + alarm_id=fm_constants.FM_ALARM_ID_STORAGE_CEPH_FREE_SPACE, + alarm_state=fm_constants.FM_ALARM_STATE_SET, + entity_type_id=fm_constants.FM_ENTITY_TYPE_CLUSTER, + entity_instance_id=health['tier_eid'], + severity=fm_constants.FM_ALARM_SEVERITY_MINOR, + reason_text=quota_reason_text, + alarm_type=fm_constants.FM_ALARM_TYPE_7, + probable_cause=fm_constants.ALARM_PROBABLE_CAUSE_75, + proposed_repair_action=( + "Update ceph storage pool quotas to use all available " + "cluster space for the %s tier." % health['tier_name']), + service_affecting=False) + + alarm_uuid = self.service.fm_api.set_fault(fault) + if alarm_uuid: + LOG.info(_LI( + "Created storage quota storage alarm %(alarm_uuid)s. " + "Reason: %(reason)s") % { + "alarm_uuid": alarm_uuid, "reason": quota_reason_text}) + else: + LOG.error(_LE("Failed to create quota " + "storage alarm. Reason: %s") % quota_reason_text) + + def _clear_fault(self, alarm_id, entity_instance_id=None): + # Only clear alarm if there is one already raised + if (alarm_id == fm_constants.FM_ALARM_ID_STORAGE_CEPH and + self.current_health_alarm): + LOG.info(_LI("Clearing health alarm")) + self.service.fm_api.clear_fault( + fm_constants.FM_ALARM_ID_STORAGE_CEPH, + self.service.entity_instance_id) + elif (alarm_id == fm_constants.FM_ALARM_ID_STORAGE_CEPH_FREE_SPACE and + entity_instance_id in self.current_quota_alarms): + LOG.info(_LI("Clearing quota alarm with entity_instance_id %s") + % entity_instance_id) + self.service.fm_api.clear_fault( + fm_constants.FM_ALARM_ID_STORAGE_CEPH_FREE_SPACE, + entity_instance_id) + + def clear_critical_alarm(self, group_name): + alarm_list = self.service.fm_api.get_faults_by_id( + fm_constants.FM_ALARM_ID_STORAGE_CEPH_CRITICAL) + if alarm_list: + for alarm in range(len(alarm_list)): + group_id = alarm_list[alarm].entity_instance_id.find("group-") + group_instance_name = ( + "group-" + + alarm_list[alarm].entity_instance_id[group_id + 6]) + if group_name == group_instance_name: + self.service.fm_api.clear_fault( + fm_constants.FM_ALARM_ID_STORAGE_CEPH_CRITICAL, + alarm_list[alarm].entity_instance_id) + + def clear_all_major_critical(self, group_name=None): + # clear major alarms + alarm_list = self.service.fm_api.get_faults_by_id( + fm_constants.FM_ALARM_ID_STORAGE_CEPH_MAJOR) + if alarm_list: + for alarm in range(len(alarm_list)): + if group_name is not None: + group_id = ( + alarm_list[alarm].entity_instance_id.find("group-")) + group_instance_name = ( + "group-" + + alarm_list[alarm].entity_instance_id[group_id+6]) + if group_name == group_instance_name: + self.service.fm_api.clear_fault( + fm_constants.FM_ALARM_ID_STORAGE_CEPH_MAJOR, + alarm_list[alarm].entity_instance_id) + else: + self.service.fm_api.clear_fault( + fm_constants.FM_ALARM_ID_STORAGE_CEPH_MAJOR, + alarm_list[alarm].entity_instance_id) + # clear critical alarms + alarm_list = self.service.fm_api.get_faults_by_id( + fm_constants.FM_ALARM_ID_STORAGE_CEPH_CRITICAL) + if alarm_list: + for alarm in range(len(alarm_list)): + if group_name is not None: + group_id = ( + alarm_list[alarm].entity_instance_id.find("group-")) + group_instance_name = ( + "group-" + + alarm_list[alarm].entity_instance_id[group_id + 6]) + if group_name == group_instance_name: + self.service.fm_api.clear_fault( + fm_constants.FM_ALARM_ID_STORAGE_CEPH_CRITICAL, + alarm_list[alarm].entity_instance_id) + else: + self.service.fm_api.clear_fault( + fm_constants.FM_ALARM_ID_STORAGE_CEPH_CRITICAL, + alarm_list[alarm].entity_instance_id) + + def _get_current_alarms(self): + """ Retrieve currently raised alarm """ + self.current_health_alarm = self.service.fm_api.get_fault( + fm_constants.FM_ALARM_ID_STORAGE_CEPH, + self.service.entity_instance_id) + quota_faults = self.service.fm_api.get_faults_by_id( + fm_constants.FM_ALARM_ID_STORAGE_CEPH_FREE_SPACE) + if quota_faults: + self.current_quota_alarms = [f.entity_instance_id + for f in quota_faults] + else: + self.current_quota_alarms = [] diff --git a/ceph/ceph-manager/ceph-manager/ceph_manager/server.py b/ceph/ceph-manager/ceph-manager/ceph_manager/server.py new file mode 100644 index 00000000..9403a7c2 --- /dev/null +++ b/ceph/ceph-manager/ceph-manager/ceph_manager/server.py @@ -0,0 +1,249 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright (c) 2016-2018 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +# https://chrigl.de/posts/2014/08/27/oslo-messaging-example.html +# http://docs.openstack.org/developer/oslo.messaging/server.html + +import sys + +# noinspection PyUnresolvedReferences +import eventlet +# noinspection PyUnresolvedReferences +import oslo_messaging as messaging +# noinspection PyUnresolvedReferences +from fm_api import fm_api +# noinspection PyUnresolvedReferences +from oslo_config import cfg +# noinspection PyUnresolvedReferences +from oslo_log import log as logging +# noinspection PyUnresolvedReferences +from oslo_service import service +# noinspection PyUnresolvedReferences +from oslo_service.periodic_task import PeriodicTasks +# noinspection PyUnresolvedReferences +from oslo_service import loopingcall + +from sysinv.conductor.cache_tiering_service_config import ServiceConfig + +# noinspection PyUnresolvedReferences +from cephclient import wrapper + +from monitor import Monitor +from cache_tiering import CacheTiering +import exception +import constants + +from i18n import _LI, _LW +from retrying import retry + +eventlet.monkey_patch(all=True) + +CONF = cfg.CONF +CONF.register_opts([ + cfg.StrOpt('sysinv_api_bind_ip', + default='0.0.0.0', + help='IP for the Ceph Manager server to bind to')]) +CONF.logging_default_format_string = ( + '%(asctime)s.%(msecs)03d %(process)d ' + '%(levelname)s %(name)s [-] %(message)s') +logging.register_options(CONF) +logging.setup(CONF, __name__) +LOG = logging.getLogger(__name__) +CONF.rpc_backend = 'rabbit' + + +class RpcEndpoint(PeriodicTasks): + + def __init__(self, service=None): + self.service = service + + def cache_tiering_enable_cache(self, _, new_config, applied_config): + LOG.info(_LI("Enabling cache")) + try: + self.service.cache_tiering.enable_cache( + new_config, applied_config) + except exception.CephManagerException as e: + self.service.sysinv_conductor.call( + {}, 'cache_tiering_enable_cache_complete', + success=False, exception=str(e.message), + new_config=new_config, applied_config=applied_config) + + def cache_tiering_disable_cache(self, _, new_config, applied_config): + LOG.info(_LI("Disabling cache")) + try: + self.service.cache_tiering.disable_cache( + new_config, applied_config) + except exception.CephManagerException as e: + self.service.sysinv_conductor.call( + {}, 'cache_tiering_disable_cache_complete', + success=False, exception=str(e.message), + new_config=new_config, applied_config=applied_config) + + def cache_tiering_operation_in_progress(self, _): + is_locked = self.service.cache_tiering.is_locked() + LOG.info(_LI("Cache tiering operation " + "is in progress: %s") % str(is_locked).lower()) + return is_locked + + def get_primary_tier_size(self, _): + """Get the ceph size for the primary tier. + + returns: an int for the size (in GB) of the tier + """ + + tiers_size = self.service.monitor.tiers_size + primary_tier_size = tiers_size.get( + self.service.monitor.primary_tier_name, 0) + LOG.debug(_LI("Ceph cluster primary tier size: %s GB") % + str(primary_tier_size)) + return primary_tier_size + + def get_tiers_size(self, _): + """Get the ceph cluster tier sizes. + + returns: a dict of sizes (in GB) by tier name + """ + + tiers_size = self.service.monitor.tiers_size + LOG.debug(_LI("Ceph cluster tiers (size in GB): %s") % + str(tiers_size)) + return tiers_size + + def is_cluster_up(self, _): + """Report if the last health check was successful. + + This is an independent view of the cluster accessibility that can be + used by the sysinv conductor to gate ceph API calls which would timeout + and potentially block other operations. + + This view is only updated at the rate the monitor checks for a cluster + uuid or a health check (CEPH_HEALTH_CHECK_INTERVAL) + + returns: boolean True if last health check was successful else False + """ + return self.service.monitor.cluster_is_up + + +# This class is needed only when upgrading from 16.10 to 17.x +# TODO: remove it after 1st 17.x release +# +class SysinvConductorUpgradeApi(object): + def __init__(self): + self.sysinv_conductor = None + super(SysinvConductorUpgradeApi, self).__init__() + + def get_software_upgrade_status(self): + LOG.info(_LI("Getting software upgrade status from sysinv")) + cctxt = self.sysinv_conductor.prepare(timeout=2) + upgrade = cctxt.call({}, 'get_software_upgrade_status') + LOG.info(_LI("Software upgrade status: %s") % str(upgrade)) + return upgrade + + @retry(wait_fixed=1000, + retry_on_exception=lambda exception: + LOG.warn(_LW( + "Getting software upgrade status failed " + "with: %s. Retrying... ") % str(exception)) or True) + def retry_get_software_upgrade_status(self): + return self.get_software_upgrade_status() + + +class Service(SysinvConductorUpgradeApi, service.Service): + + def __init__(self, conf): + super(Service, self).__init__() + self.conf = conf + self.rpc_server = None + self.sysinv_conductor = None + self.ceph_api = None + self.entity_instance_id = '' + self.fm_api = fm_api.FaultAPIs() + self.monitor = Monitor(self) + self.cache_tiering = CacheTiering(self) + self.config = None + self.config_desired = None + self.config_applied = None + + def start(self): + super(Service, self).start() + transport = messaging.get_transport(self.conf) + self.sysinv_conductor = messaging.RPCClient( + transport, + messaging.Target( + topic=constants.SYSINV_CONDUCTOR_TOPIC)) + + self.ceph_api = wrapper.CephWrapper( + endpoint='http://localhost:5001/api/v0.1/') + + # Get initial config from sysinv and send it to + # services that need it before starting them + config = self.get_caching_tier_config() + self.monitor.setup(config) + self.rpc_server = messaging.get_rpc_server( + transport, + messaging.Target(topic=constants.CEPH_MANAGER_TOPIC, + server=self.conf.sysinv_api_bind_ip), + [RpcEndpoint(self)], + executor='eventlet') + self.rpc_server.start() + self.cache_tiering.set_initial_config(config) + eventlet.spawn_n(self.monitor.run) + periodic = loopingcall.FixedIntervalLoopingCall( + self.update_ceph_target_max_bytes) + periodic.start(interval=300) + + def get_caching_tier_config(self): + LOG.info("Getting cache tiering configuration from sysinv") + while True: + # Get initial configuration from sysinv, + # retry until sysinv starts + try: + cctxt = self.sysinv_conductor.prepare(timeout=2) + config = cctxt.call({}, 'cache_tiering_get_config') + for section in config: + if section == constants.CACHE_TIERING: + self.config = ServiceConfig().from_dict( + config[section]) + elif section == constants.CACHE_TIERING_DESIRED: + self.config_desired = ServiceConfig().from_dict( + config[section]) + elif section == constants.CACHE_TIERING_APPLIED: + self.config_applied = ServiceConfig().from_dict( + config[section]) + LOG.info("Cache tiering configs: {}".format(config)) + return config + except Exception as ex: + # In production we should retry on every error until connection + # is reestablished. + LOG.warn("Getting cache tiering configuration failed " + "with: {}. Retrying... ".format(str(ex))) + + def stop(self): + try: + self.rpc_server.stop() + self.rpc_server.wait() + except Exception: + pass + super(Service, self).stop() + + def update_ceph_target_max_bytes(self): + try: + self.cache_tiering.update_cache_target_max_bytes() + except Exception as ex: + LOG.exception("Updating Ceph target max bytes failed " + "with: {} retrying on next cycle.".format(str(ex))) + + +def run_service(): + CONF(sys.argv[1:]) + logging.setup(CONF, "ceph-manager") + launcher = service.launch(CONF, Service(CONF), workers=1) + launcher.wait() + + +if __name__ == "__main__": + run_service() diff --git a/ceph/ceph-manager/ceph-manager/ceph_manager/tests/__init__.py b/ceph/ceph-manager/ceph-manager/ceph_manager/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ceph/ceph-manager/ceph-manager/ceph_manager/tests/test_cache_flush.py b/ceph/ceph-manager/ceph-manager/ceph_manager/tests/test_cache_flush.py new file mode 100644 index 00000000..2fd26519 --- /dev/null +++ b/ceph/ceph-manager/ceph-manager/ceph_manager/tests/test_cache_flush.py @@ -0,0 +1,309 @@ +# +# Copyright (c) 2016 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +import unittest +import mock + +import subprocess +import math + +from ..cache_tiering import CacheTiering +from ..cache_tiering import LOG as CT_LOG +from ..constants import CACHE_FLUSH_OBJECTS_THRESHOLD +from ..constants import CACHE_FLUSH_MIN_WAIT_OBJ_COUNT_DECREASE_SEC as MIN_WAIT +from ..constants import CACHE_FLUSH_MAX_WAIT_OBJ_COUNT_DECREASE_SEC as MAX_WAIT +from ..exception import CephCacheFlushFailure + + +class TestCacheFlush(unittest.TestCase): + + def setUp(self): + self.service = mock.Mock() + self.ceph_api = mock.Mock() + self.service.ceph_api = self.ceph_api + self.cache_tiering = CacheTiering(self.service) + + @mock.patch('subprocess.check_call') + def test_set_param_fail(self, mock_proc_call): + self.ceph_api.osd_set_pool_param = mock.Mock() + self.ceph_api.osd_set_pool_param.return_value = ( + mock.Mock(ok=False, status_code=500, reason='denied'), + {}) + self.cache_tiering.cache_flush({'pool_name': 'test'}) + mock_proc_call.assert_called_with( + ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all']) + + @mock.patch('subprocess.check_call') + def test_df_fail(self, mock_proc_call): + self.ceph_api.osd_set_pool_param = mock.Mock() + self.ceph_api.osd_set_pool_param.return_value = ( + mock.Mock(ok=True, status_code=200, reason='OK'), + {}) + self.ceph_api.df = mock.Mock() + self.ceph_api.df.return_value = ( + mock.Mock(ok=False, status_code=500, reason='denied'), + {}) + self.cache_tiering.cache_flush({'pool_name': 'test'}) + self.ceph_api.osd_set_pool_param.assert_called_once_with( + 'test-cache', 'target_max_objects', 1, force=None, body='json') + mock_proc_call.assert_called_with( + ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all']) + + @mock.patch('subprocess.check_call') + def test_rados_evict_fail_raises(self, mock_proc_call): + mock_proc_call.side_effect = subprocess.CalledProcessError(1, ['cmd']) + self.ceph_api.osd_set_pool_param = mock.Mock() + self.ceph_api.osd_set_pool_param.return_value = ( + mock.Mock(ok=False, status_code=500, reason='denied'), + {}) + self.assertRaises(CephCacheFlushFailure, + self.cache_tiering.cache_flush, + {'pool_name': 'test'}) + mock_proc_call.assert_called_with( + ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all']) + + @mock.patch('subprocess.check_call') + def test_df_missing_pool(self, mock_proc_call): + self.ceph_api.osd_set_pool_param = mock.Mock() + self.ceph_api.osd_set_pool_param.return_value = ( + mock.Mock(ok=True, status_code=200, reason='OK'), + {}) + self.ceph_api.df = mock.Mock() + self.ceph_api.df.return_value = ( + mock.Mock(ok=True, status_code=200, reason='OK'), + {'output': { + 'pools': [ + {'id': 0, + 'name': 'rbd', + 'stats': {'bytes_used': 0, + 'kb_used': 0, + 'max_avail': 9588428800, + 'objects': 0}}]}, + 'status': 'OK'}) + with mock.patch.object(CT_LOG, 'warn') as mock_lw: + self.cache_tiering.cache_flush({'pool_name': 'test'}) + self.ceph_api.df.assert_called_once_with(body='json') + for c in mock_lw.call_args_list: + if 'Missing pool free space' in c[0][0]: + break + else: + self.fail('expected log warning') + self.ceph_api.osd_set_pool_param.assert_called_once_with( + 'test-cache', 'target_max_objects', 1, force=None, body='json') + mock_proc_call.assert_called_with( + ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all']) + + @mock.patch('subprocess.check_call') + def test_df_objects_empty(self, mock_proc_call): + self.ceph_api.osd_set_pool_param = mock.Mock() + self.ceph_api.osd_set_pool_param.return_value = ( + mock.Mock(ok=True, status_code=200, reason='OK'), + {}) + self.ceph_api.df = mock.Mock() + self.ceph_api.df.return_value = ( + mock.Mock(ok=True, status_code=200, reason='OK'), + {'output': { + 'pools': [ + {'id': 0, + 'name': 'test-cache', + 'stats': {'bytes_used': 0, + 'kb_used': 0, + 'max_avail': 9588428800, + 'objects': 0}}]}, + 'status': 'OK'}) + self.cache_tiering.cache_flush({'pool_name': 'test'}) + self.ceph_api.df.assert_called_once_with(body='json') + self.ceph_api.osd_set_pool_param.assert_called_once_with( + 'test-cache', 'target_max_objects', 1, force=None, body='json') + mock_proc_call.assert_called_with( + ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all']) + + @mock.patch('time.sleep') + @mock.patch('subprocess.check_call') + def test_df_objects_above_threshold(self, mock_proc_call, mock_time_sleep): + self.ceph_api.osd_set_pool_param = mock.Mock() + self.ceph_api.osd_set_pool_param.return_value = ( + mock.Mock(ok=True, status_code=200, reason='OK'), + {}) + self.ceph_api.df = mock.Mock() + self.ceph_api.df.side_effect = [ + (mock.Mock(ok=True, status_code=200, reason='OK'), + {'output': { + 'pools': [ + {'id': 0, + 'name': 'test-cache', + 'stats': {'bytes_used': 0, + 'kb_used': 0, + 'max_avail': 9588428800, + 'objects': CACHE_FLUSH_OBJECTS_THRESHOLD}}]}, + 'status': 'OK'}), + (mock.Mock(ok=True, status_code=200, reason='OK'), + {'output': { + 'pools': [ + {'id': 0, + 'name': 'test-cache', + 'stats': {'bytes_used': 0, + 'kb_used': 0, + 'max_avail': 9588428800, + 'objects': + CACHE_FLUSH_OBJECTS_THRESHOLD - 1}}]}, + 'status': 'OK'})] + self.cache_tiering.cache_flush({'pool_name': 'test'}) + self.ceph_api.osd_set_pool_param.assert_called_once_with( + 'test-cache', 'target_max_objects', 1, force=None, body='json') + self.ceph_api.df.assert_called_with(body='json') + mock_time_sleep.assert_called_once_with(MIN_WAIT) + mock_proc_call.assert_called_with( + ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all']) + + @mock.patch('time.sleep') + @mock.patch('subprocess.check_call') + def test_df_objects_interval_increase(self, mock_proc_call, + mock_time_sleep): + self.ceph_api.osd_set_pool_param = mock.Mock() + self.ceph_api.osd_set_pool_param.return_value = ( + mock.Mock(ok=True, status_code=200, reason='OK'), + {}) + self.ceph_api.df = mock.Mock() + self.ceph_api.df.side_effect = [ + (mock.Mock(ok=True, status_code=200, reason='OK'), + {'output': { + 'pools': [ + {'id': 0, + 'name': 'test-cache', + 'stats': {'bytes_used': 0, + 'kb_used': 0, + 'max_avail': 9588428800, + 'objects': + CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]}, + 'status': 'OK'}), + (mock.Mock(ok=True, status_code=200, reason='OK'), + {'output': { + 'pools': [ + {'id': 0, + 'name': 'test-cache', + 'stats': {'bytes_used': 0, + 'kb_used': 0, + 'max_avail': 9588428800, + 'objects': + CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]}, + 'status': 'OK'}), + (mock.Mock(ok=True, status_code=200, reason='OK'), + {'output': { + 'pools': [ + {'id': 0, + 'name': 'test-cache', + 'stats': {'bytes_used': 0, + 'kb_used': 0, + 'max_avail': 9588428800, + 'objects': + CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]}, + 'status': 'OK'}), + (mock.Mock(ok=True, status_code=200, reason='OK'), + {'output': { + 'pools': [ + {'id': 0, + 'name': 'test-cache', + 'stats': {'bytes_used': 0, + 'kb_used': 0, + 'max_avail': 9588428800, + 'objects': + CACHE_FLUSH_OBJECTS_THRESHOLD - 1}}]}, + 'status': 'OK'})] + self.cache_tiering.cache_flush({'pool_name': 'test'}) + self.ceph_api.osd_set_pool_param.assert_called_once_with( + 'test-cache', 'target_max_objects', 1, force=None, body='json') + self.ceph_api.df.assert_called_with(body='json') + self.assertEqual([c[0][0] for c in mock_time_sleep.call_args_list], + [MIN_WAIT, + MIN_WAIT * 2, + MIN_WAIT * 4]) + mock_proc_call.assert_called_with( + ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all']) + + @mock.patch('time.sleep') + @mock.patch('subprocess.check_call') + def test_df_objects_allways_over_threshold(self, mock_proc_call, + mock_time_sleep): + self.ceph_api.osd_set_pool_param = mock.Mock() + self.ceph_api.osd_set_pool_param.return_value = ( + mock.Mock(ok=True, status_code=200, reason='OK'), + {}) + self.ceph_api.df = mock.Mock() + self.ceph_api.df.return_value = ( + mock.Mock(ok=True, status_code=200, reason='OK'), + {'output': { + 'pools': [ + {'id': 0, + 'name': 'test-cache', + 'stats': {'bytes_used': 0, + 'kb_used': 0, + 'max_avail': 9588428800, + 'objects': + CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]}, + 'status': 'OK'}) + # noinspection PyTypeChecker + mock_time_sleep.side_effect = \ + [None]*int(math.ceil(math.log(float(MAX_WAIT)/MIN_WAIT, 2)) + 1) \ + + [Exception('too many sleeps')] + self.cache_tiering.cache_flush({'pool_name': 'test'}) + self.ceph_api.osd_set_pool_param.assert_called_once_with( + 'test-cache', 'target_max_objects', 1, force=None, body='json') + self.ceph_api.df.assert_called_with(body='json') + expected_sleep = [] + interval = MIN_WAIT + while interval <= MAX_WAIT: + expected_sleep.append(interval) + interval *= 2 + self.assertEqual([c[0][0] for c in mock_time_sleep.call_args_list], + expected_sleep) + mock_proc_call.assert_called_with( + ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all']) + + @mock.patch('time.sleep') + @mock.patch('subprocess.check_call') + def test_df_objects_increase(self, mock_proc_call, mock_time_sleep): + self.ceph_api.osd_set_pool_param = mock.Mock() + self.ceph_api.osd_set_pool_param.return_value = ( + mock.Mock(ok=True, status_code=200, reason='OK'), + {}) + self.ceph_api.df = mock.Mock() + self.ceph_api.df.side_effect = [ + (mock.Mock(ok=True, status_code=200, reason='OK'), + {'output': { + 'pools': [ + {'id': 0, + 'name': 'test-cache', + 'stats': {'bytes_used': 0, + 'kb_used': 0, + 'max_avail': 9588428800, + 'objects': + CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]}, + 'status': 'OK'}), + (mock.Mock(ok=True, status_code=200, reason='OK'), + {'output': { + 'pools': [ + {'id': 0, + 'name': 'test-cache', + 'stats': {'bytes_used': 0, + 'kb_used': 0, + 'max_avail': 9588428800, + 'objects': + CACHE_FLUSH_OBJECTS_THRESHOLD + 2}}]}, + 'status': 'OK'})] + with mock.patch.object(CT_LOG, 'warn') as mock_lw: + self.cache_tiering.cache_flush({'pool_name': 'test'}) + for c in mock_lw.call_args_list: + if 'Unexpected increase' in c[0][0]: + break + else: + self.fail('expected log warning') + self.ceph_api.df.assert_called_with(body='json') + mock_time_sleep.assert_called_once_with(MIN_WAIT) + self.ceph_api.osd_set_pool_param.assert_called_once_with( + 'test-cache', 'target_max_objects', 1, force=None, body='json') + mock_proc_call.assert_called_with( + ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all']) diff --git a/ceph/ceph-manager/ceph-manager/setup.py b/ceph/ceph-manager/ceph-manager/setup.py new file mode 100644 index 00000000..40cf5012 --- /dev/null +++ b/ceph/ceph-manager/ceph-manager/setup.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python +# +# Copyright (c) 2013-2014, 2016 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + + +import setuptools + +setuptools.setup( + name='ceph_manager', + version='1.0.0', + description='CEPH manager', + license='Apache-2.0', + packages=['ceph_manager'], + entry_points={ + } +) diff --git a/ceph/ceph-manager/ceph-manager/test-requirements.txt b/ceph/ceph-manager/ceph-manager/test-requirements.txt new file mode 100644 index 00000000..1fdf2056 --- /dev/null +++ b/ceph/ceph-manager/ceph-manager/test-requirements.txt @@ -0,0 +1,10 @@ +# The order of packages is significant, because pip processes them in the order +# of appearance. Changing the order has an impact on the overall integration +# process, which may cause wedges in the gate later. + +mock +flake8 +eventlet +pytest +oslo.log +oslo.i18n \ No newline at end of file diff --git a/ceph/ceph-manager/ceph-manager/tox.ini b/ceph/ceph-manager/ceph-manager/tox.ini new file mode 100644 index 00000000..41d3854b --- /dev/null +++ b/ceph/ceph-manager/ceph-manager/tox.ini @@ -0,0 +1,29 @@ +# adapted from glance tox.ini + +[tox] +minversion = 1.6 +envlist = py27,pep8 +skipsdist = True +# tox does not work if the path to the workdir is too long, so move it to /tmp +toxworkdir = /tmp/{env:USER}_ceph_manager_tox + +[testenv] +setenv = VIRTUAL_ENV={envdir} +usedevelop = True +install_command = pip install --no-use-wheel -U --force-reinstall {opts} {packages} +deps = -r{toxinidir}/test-requirements.txt +commands = py.test {posargs} +whitelist_externals = bash +passenv = http_proxy HTTP_PROXY https_proxy HTTPS_PROXY no_proxy NO_PROXY + +[testenv:py27] +basepython = python2.7 +setenv = + PYTHONPATH={toxinidir}/../../../../sysinv/recipes-common/sysinv/sysinv:{toxinidir}/../../../../config/recipes-common/tsconfig/tsconfig + +[testenv:pep8] +commands = + flake8 {posargs} + +[flake8] +exclude = .venv,.git,.tox,dist,doc,etc,*glance/locale*,*lib/python*,*egg,build diff --git a/ceph/ceph-manager/files/ceph-manager.logrotate b/ceph/ceph-manager/files/ceph-manager.logrotate new file mode 100644 index 00000000..8d7a16ab --- /dev/null +++ b/ceph/ceph-manager/files/ceph-manager.logrotate @@ -0,0 +1,11 @@ +/var/log/ceph-manager.log { + nodateext + size 10M + start 1 + rotate 10 + missingok + notifempty + compress + delaycompress + copytruncate +} diff --git a/ceph/ceph-manager/files/ceph-manager.service b/ceph/ceph-manager/files/ceph-manager.service new file mode 100644 index 00000000..e8bf26cf --- /dev/null +++ b/ceph/ceph-manager/files/ceph-manager.service @@ -0,0 +1,17 @@ +[Unit] +Description=Handle Ceph API calls and provide status updates via alarms +After=ceph.target + +[Service] +Type=forking +Restart=no +KillMode=process +RemainAfterExit=yes +ExecStart=/etc/rc.d/init.d/ceph-manager start +ExecStop=/etc/rc.d/init.d/ceph-manager stop +ExecReload=/etc/rc.d/init.d/ceph-manager reload +PIDFile=/var/run/ceph/ceph-manager.pid + +[Install] +WantedBy=multi-user.target + diff --git a/ceph/ceph-manager/scripts/bin/ceph-manager b/ceph/ceph-manager/scripts/bin/ceph-manager new file mode 100644 index 00000000..9aa4330d --- /dev/null +++ b/ceph/ceph-manager/scripts/bin/ceph-manager @@ -0,0 +1,17 @@ +#!/usr/bin/env python +# +# Copyright (c) 2016 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + + +import sys + +try: + from ceph_manager.server import run_service +except EnvironmentError as e: + print >> sys.stderr, "Error importing ceph_manager: ", str(e) + sys.exit(1) + +run_service() diff --git a/ceph/ceph-manager/scripts/init.d/ceph-manager b/ceph/ceph-manager/scripts/init.d/ceph-manager new file mode 100644 index 00000000..88bdddfb --- /dev/null +++ b/ceph/ceph-manager/scripts/init.d/ceph-manager @@ -0,0 +1,103 @@ +#!/bin/sh +# +# Copyright (c) 2013-2014, 2016 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + + +### BEGIN INIT INFO +# Provides: ceph-manager +# Required-Start: $ceph +# Required-Stop: $ceph +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Short-Description: Daemon for polling ceph status +# Description: Daemon for polling ceph status +### END INIT INFO + +DESC="ceph-manager" +DAEMON="/usr/bin/ceph-manager" +RUNDIR="/var/run/ceph" +PIDFILE=$RUNDIR/$DESC.pid + +CONFIGFILE="/etc/sysinv/sysinv.conf" +LOGFILE="/var/log/ceph-manager.log" + +start() +{ + if [ -e $PIDFILE ]; then + PIDDIR=/prod/$(cat $PIDFILE) + if [ -d ${PIDFILE} ]; then + echo "$DESC already running." + exit 0 + else + echo "Removing stale PID file $PIDFILE" + rm -f $PIDFILE + fi + fi + + echo -n "Starting $DESC..." + mkdir -p $RUNDIR + start-stop-daemon --start --quiet \ + --pidfile ${PIDFILE} --exec ${DAEMON} \ + --make-pidfile --background \ + -- --log-file=$LOGFILE --config-file=$CONFIGFILE + + if [ $? -eq 0 ]; then + echo "done." + else + echo "failed." + exit 1 + fi +} + +stop() +{ + echo -n "Stopping $DESC..." + start-stop-daemon --stop --quiet --pidfile $PIDFILE --retry 60 + if [ $? -eq 0 ]; then + echo "done." + else + echo "failed." + fi + rm -f $PIDFILE +} + +status() +{ + pid=`cat $PIDFILE 2>/dev/null` + if [ -n "$pid" ]; then + if ps -p $pid &> /dev/null ; then + echo "$DESC is running" + exit 0 + else + echo "$DESC is not running but has pid file" + exit 1 + fi + fi + echo "$DESC is not running" + exit 3 +} + +case "$1" in + start) + start + ;; + stop) + stop + ;; + restart|force-reload|reload) + stop + start + ;; + status) + status + ;; + *) + echo "Usage: $0 {start|stop|force-reload|restart|reload|status}" + exit 1 + ;; +esac + +exit 0