charm-interface-ceph-rbd-mi.../requires.py

261 lines
10 KiB
Python

# Copyright 2018 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ipaddress
import json
import socket
import uuid
from netaddr.core import AddrFormatError
# the reactive framework unfortunately does not grok `import as` in conjunction
# with decorators on class instance methods, so we have to revert to `from ...`
# imports
from charms.reactive import (
Endpoint,
all_flags_set,
clear_flag,
set_flag,
when,
when_not,
)
import charmhelpers.contrib.storage.linux.ceph as ch_ceph
import charmhelpers.contrib.network.ip as ch_ip
class CephRBDMirrorRequires(Endpoint):
def __init__(self, endpoint_name, relation_ids=None, unique_id=None):
"""Initialize unique ID.
This is used when requesting a key from Ceph.
The unique_id constructor parameter exists mainly for testing purposes.
"""
if unique_id:
self.unique_id = unique_id
else:
self.unique_id = socket.gethostname()
self.key_name = '{}_key'.format(self.unique_id)
super().__init__(endpoint_name, relation_ids=relation_ids)
@when('endpoint.{endpoint_name}.joined')
def joined(self):
set_flag(self.expand_name('{endpoint_name}.connected'))
@when('endpoint.{endpoint_name}.changed')
def changed(self):
flags = (
self.expand_name(
'endpoint.{endpoint_name}.changed.auth'),
self.expand_name(
'endpoint.{endpoint_name}.changed.' + self.key_name),
self.expand_name(
'endpoint.{endpoint_name}.changed.ceph-public-address'),
)
if all_flags_set(*flags):
for flag in (flags):
clear_flag(flag)
set_flag(self.expand_name('{endpoint_name}.available'))
@when_not('endpoint.{endpoint_name}.joined')
def broken(self):
clear_flag(self.expand_name('{endpoint_name}.available'))
clear_flag(self.expand_name('{endpoint_name}.connected'))
def request_key(self):
"""Request key from Ceph by providing our unique ID."""
for relation in self.relations:
relation.to_publish['unique_id'] = self.unique_id
def refresh_pools(self):
"""Refresh list of pools by setting a nonce on the relation."""
for relation in self.relations:
relation.to_publish['nonce'] = str(uuid.uuid4())
def create_replicated_pool(self, name, replicas=3, weight=None,
pg_num=None, group=None, namespace=None,
app_name=None, max_bytes=None,
max_objects=None):
"""Request replicated pool setup.
Refer to charm-helpers ``add_op_create_replicated_pool`` function for
documentation of parameters.
"""
# Ensure type of numeric values before sending over the wire
replicas = int(replicas) if replicas else None
weight = float(weight) if weight else None
pg_num = int(pg_num) if pg_num else None
max_bytes = int(max_bytes) if max_bytes else None
max_objects = int(max_objects) if max_objects else None
for relation in self.relations:
current_request = ch_ceph.get_previous_request(
relation.relation_id) or ch_ceph.CephBrokerRq()
for req in current_request.ops:
if 'op' in req and 'name' in req:
if req['op'] == 'create-pool' and req['name'] == name:
# request already exists, don't create a new one
return
current_request.add_op_create_replicated_pool(
name="{}".format(name),
replica_count=replicas,
pg_num=pg_num,
weight=weight,
group=group,
namespace=namespace,
app_name=app_name,
max_bytes=max_bytes,
max_objects=max_objects)
ch_ceph.send_request_if_needed(current_request,
relation=self.endpoint_name)
def create_erasure_pool(self, name, erasure_profile=None, weight=None,
group=None, app_name=None, max_bytes=None,
max_objects=None):
"""Request erasure coded pool setup.
Refer to charm-helpers ``add_op_create_erasure_pool``function for
documentation of parameters.
"""
# Ensure type of numeric values before sending over the wire
weight = float(weight) if weight else None
max_bytes = int(max_bytes) if max_bytes else None
max_objects = int(max_objects) if max_objects else None
for relation in self.relations:
current_request = ch_ceph.get_previous_request(
relation.relation_id) or ch_ceph.CephBrokerRq()
for req in current_request.ops:
if 'op' in req and 'name' in req:
if req['op'] == 'create-pool' and req['name'] == name:
# request already exists, don't create a new one
return
current_request.add_op_create_erasure_pool(
name="{}".format(name),
erasure_profile=erasure_profile,
weight=weight,
group=group,
app_name=app_name,
max_bytes=max_bytes,
max_objects=max_objects)
ch_ceph.send_request_if_needed(current_request,
relation=self.endpoint_name)
def maybe_send_rq(self, rq):
"""Send single broker request with all operations if needed.
The rbd-mirror charm has two endpoints using this interface connected
to the ceph-mon in the local and remote clusters. Subsequently each
relation typically only has one other participant, the ceph-mon.
The charm will recieve a verbatim copy of every broker request the
ceph-mon knows about in one end and then extract and filter all the
operations and collapse into one new single broker request that is
maintained with the ceph-mon in the other end.
:param rq: Broker Request to evaluate for sending.
:type rq: ch_ceph.CephBrokerRq
"""
for relation in self.relations:
ch_ceph.send_request_if_needed(rq, relation=self.endpoint_name)
@property
def auth(self):
"""Retrieve ``auth`` from relation data."""
return self.all_joined_units.received['auth']
@property
def key(self):
"""Retrieve key from relation data."""
return self.all_joined_units.received[self.key_name]
def mon_hosts(self):
"""Providwe iterable with address of individual related ceph-mon units.
NOTE(fnordahl): As much as this should and could have been a property
we have pre-existing interfaces providing this as a function. To be
able to use the same code for relation adaption etc in
``charms.openstack`` we must keep having this as a function unless we
go back and change both to being properties.
"""
for relation in self.relations:
for unit in relation.units:
try:
addr = ipaddress.ip_address(
unit.received.get('ceph-public-address', ''))
except ValueError:
continue
port = 6789
if isinstance(addr, ipaddress.IPv6Address):
yield '[{}]:{}'.format(addr, port)
else:
yield '{}:{}'.format(addr, port)
@property
def public_network(self):
"""Get CIDR for the Ceph public network.
The public network address advertiesed on the relation is mapped to the
corrensponding local interface from which we get the netmask/cidr of
the network.
:returns: CIDR or None
:rtype: Option[str, None]
"""
public_addr = self.all_joined_units.received['ceph-public-address']
if public_addr:
try:
return ch_ip.resolve_network_cidr(public_addr)
except AddrFormatError:
# LP#1898299 in some cases the netmask will be None, which
# leads to an AddrFormatError. In this case, we should return
# None
return None
@property
def cluster_network(self):
"""Get CIDR for the Ceph cluster network.
The cluster network address advertiesed on the relation is mapped to
the corrensponding local interface from which we get the netmask/cidr
of the network.
:returns: CIDR or None
:rtype: Option[str, None]
"""
cluster_addr = self.all_joined_units.received['ceph-cluster-address']
if cluster_addr:
try:
return ch_ip.resolve_network_cidr(cluster_addr)
except AddrFormatError:
# LP#1898299 in some cases the netmask will be None, which
# leads to an AddrFormatError. In this case, we should return
# None
return None
@property
def pools(self):
return self.all_joined_units.received['pools']
@property
def broker_requests(self):
if ('broker_requests' in self.all_joined_units.received and
self.all_joined_units.received['broker_requests'] is not None):
for json_rq in self.all_joined_units.received['broker_requests']:
yield json.loads(json_rq)
# Empty return in generator provides empty iterator and not None PEP479
return