Add mount automation example based on Zaqar

This example consists of three parts:
- Server side hook module
'contrib.share_driver_hooks.zaqar_notification' that can be enabled in Manila
- Client side consumer module
'contrib.share_driver_hooks.zaqar_notification_example_consumer' that can be
used in any user machine.
- Common module 'contrib.share_driver_hooks.zaqarclientwrapper' that is used
by server and client side modules for initialization of Zaqar client.

Details of its usage are described in file
'contrib/share_driver_hooks/README.rst'

Change-Id: I5e802ee2e2a4dd36db92865b0ba82e73c1fa86d4
This commit is contained in:
Valeriy Ponomaryov 2015-09-25 13:56:37 +03:00
parent 82e38ad2fa
commit 2f4795f7fe
4 changed files with 553 additions and 0 deletions

View File

@ -0,0 +1,113 @@
Manila mount automation example using share driver hooks feature
================================================================
Manila has feature called 'share driver hooks'. Which allows to perform
actions before and after driver actions such as 'create share' or
'access allow', also allows to do custom things on periodic basis.
Here, we provide example of mount automation using this feature.
This example uses OpenStack Zaqar project for sending notifications
when operations 'access allow' and 'access deny' are performed.
Server side hook will send notifications about changed access for shares
after granting and prior to denying access.
Possibilities of the mount automation example (consumer)
--------------------------------------------------------
- Supports only 'NFS' protocol.
- Supports only 'IP' rules.
- Supports both levels of access - 'RW' and 'RO'.
- Consume interval can be configured.
- Allows to choose parent mount directory.
Server side setup and run
-------------------------
1. Place files 'zaqarclientwrapper.py' and 'zaqar_notification.py' to dir
%manila_dir%/manila/share/hooks.
Then update manila configuration file with following options:
::
[share_backend_config_group]
hook_drivers = manila.share.hooks.zaqar_notification.ZaqarNotification
enable_pre_hooks = True
enable_post_hooks = True
enable_periodic_hooks = False
[zaqar]
zaqar_auth_url = http://%ip_of_endpoint_with_keystone%:35357/v2.0/
zaqar_region_name = %name_of_region_optional%
zaqar_username = foo_user
zaqar_password = foo_tenant
zaqar_project_name = foo_password
zaqar_queues = manila_notification
2. Restart manila-share service.
Consumer side setup and run
---------------------------
1. Place files 'zaqarclientwrapper.py' and
'zaqar_notification_example_consumer.py' to any dir on user machine, but they
both should be in the same dir.
2. Make sure that following dependencies are installed:
- PIP dependencies:
- netaddr
- oslo_concurrency
- oslo_config
- oslo_utils
- python-zaqarclient
- six
- System libs that install 'mount' and 'mount.nfs' apps.
3. Create file with following options:
::
[zaqar]
# Consumer-related options
sleep_between_consume_attempts = 7
mount_dir = "/tmp"
expected_ip_addresses = 10.254.0.4
# Common options for consumer and server sides
zaqar_auth_url = http://%ip_of_endpoint_with_keystone%:35357/v2.0/
zaqar_region_name = %name_of_region_optional%
zaqar_username = foo_user
zaqar_password = foo_tenant
zaqar_project_name = foo_password
zaqar_queues = manila_notification
Consumer options descriptions:
- 'sleep_between_consume_attempts' - wait interval between consuming
notifications from message queue.
- 'mount_dir' - parent mount directory that will contain all mounted shares
as subdirectories.
- 'expected_ip_addresses' - list of IP addresses that are expected
to be granted access for. Could be either equal to or be part of a CIDR.
Match triggers [un]mount operations.
4. Run consumer with following command:
::
$ zaqar_notification_example_consumer.py --config-file path/to/config.conf
5. Now create NFS share and grant IP access to consumer by its IP address.

View File

@ -0,0 +1,121 @@
# Copyright (c) 2015 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from oslo_utils import timeutils
from manila import exception
from manila.share import api
from manila.share import hook
from manila.share.hooks import zaqarclientwrapper # noqa
CONF = zaqarclientwrapper.CONF
LOG = log.getLogger(__name__)
ZAQARCLIENT = zaqarclientwrapper.ZAQARCLIENT
class ZaqarNotification(hook.HookBase):
share_api = api.API()
def _access_changed_trigger(self, context, func_name,
access_id, share_instance_id):
access = self.share_api.access_get(context, access_id=access_id)
share = self.share_api.get(context, share_id=access.share_id)
for ins in share.instances:
if ins.id == share_instance_id:
share_instance = ins
break
else:
raise exception.InstanceNotFound(instance_id=share_instance_id)
for ins in access.instance_mappings:
if ins.share_instance_id == share_instance_id:
access_instance = ins
break
else:
raise exception.InstanceNotFound(instance_id=share_instance_id)
is_allow_operation = 'allow' in func_name
results = {
'share_id': access.share_id,
'share_instance_id': share_instance_id,
'export_locations': [
el.path for el in share_instance.export_locations],
'share_proto': share.share_proto,
'access_id': access.id,
'access_instance_id': access_instance.id,
'access_type': access.access_type,
'access_to': access.access_to,
'access_level': access.access_level,
'access_state': access_instance.state,
'is_allow_operation': is_allow_operation,
'availability_zone': share_instance.availability_zone,
}
LOG.debug(results)
return results
def _execute_pre_hook(self, context, func_name, *args, **kwargs):
LOG.debug("\n PRE zaqar notification has been called for "
"method '%s'.\n" % func_name)
if func_name == "deny_access":
LOG.debug("\nSending notification about denied access.\n")
data = self._access_changed_trigger(
context,
func_name,
kwargs.get('access_id'),
kwargs.get('share_instance_id'),
)
self._send_notification(data)
def _execute_post_hook(self, context, func_name, pre_hook_data,
driver_action_results, *args, **kwargs):
LOG.debug("\n POST zaqar notification has been called for "
"method '%s'.\n" % func_name)
if func_name == "allow_access":
LOG.debug("\nSending notification about allowed access.\n")
data = self._access_changed_trigger(
context,
func_name,
kwargs.get('access_id'),
kwargs.get('share_instance_id'),
)
self._send_notification(data)
def _send_notification(self, data):
for queue_name in CONF.zaqar.zaqar_queues:
ZAQARCLIENT.queue_name = queue_name
message = {
"body": {
"example_message": (
"message generated at '%s'" % timeutils.utcnow()),
"data": data,
}
}
LOG.debug(
"\n Sending message %(m)s to '%(q)s' queue using '%(u)s' user "
"and '%(p)s' project." % {
'm': message,
'q': queue_name,
'u': CONF.zaqar.zaqar_username,
'p': CONF.zaqar.zaqar_project_name,
}
)
queue = ZAQARCLIENT.queue(queue_name)
queue.post(message)
def _execute_periodic_hook(self, context, periodic_hook_data,
*args, **kwargs):
LOG.debug("Periodic zaqar notification has been called. (Placeholder)")

View File

@ -0,0 +1,233 @@
#!/usr/bin/env python
#
# Copyright (c) 2015 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import os
import pprint
import signal
import sys
import time
import netaddr
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_utils import timeutils
import six
opts = [
cfg.IntOpt(
"consume_interval",
default=5,
deprecated_name="sleep_between_consume_attempts",
help=("Time that script will sleep between requests for consuming "
"Zaqar messages in seconds."),
),
cfg.StrOpt(
"mount_dir",
default="/tmp",
help="Directory that will contain all mounted shares."
),
cfg.ListOpt(
"expected_ip_addresses",
default=[],
help=("List of IP addresses that are expected to be found in access "
"rules to trigger [un]mount operation for a share.")
),
]
CONF = cfg.CONF
def print_with_time(data):
time = six.text_type(timeutils.utcnow())
print(time + " " + six.text_type(data))
def print_pretty_dict(d):
pprint.pprint(d)
def pop_zaqar_messages(client, queues_names):
if not isinstance(queues_names, (list, set, tuple)):
queues_names = (queues_names, )
try:
user = client.conf['auth_opts']['options']['os_username']
project = client.conf['auth_opts']['options']['os_project_name']
messages = []
for queue_name in queues_names:
queue = client.queue(queue_name)
messages.extend([six.text_type(m.body) for m in queue.pop()])
print_with_time(
"Received %(len)s message[s] from '%(q)s' "
"queue using '%(u)s' user and '%(p)s' project." % {
'len': len(messages),
'q': queue_name,
'u': user,
'p': project,
}
)
return messages
except Exception as e:
print_with_time("Caught exception - %s" % e)
return []
def signal_handler(signal, frame):
print("")
print_with_time("Ctrl+C was pressed. Shutting down consumer.")
sys.exit(0)
def parse_str_to_dict(string):
if not isinstance(string, six.string_types):
return string
result = eval(string)
return result
def handle_message(data):
"""Handles consumed message.
Expected structure of a message is following:
{'data': {
'access_id': u'b28268b9-36c6-40d3-a485-22534077328f',
'access_instance_id': u'd137b2cb-f549-4141-9dd7-36b2789fb973',
'access_level': u'rw',
'access_state': u'active',
'access_to': u'7.7.7.7',
'access_type': u'ip',
'availability_zone': u'nova',
'export_locations': [u'127.0.0.1:/path/to/nfs/share'],
'is_allow_operation': True,
'share_id': u'053eae9a-726f-4f7e-8502-49d7b1adf290',
'share_instance_id': u'dc33e554-e0b9-40f5-9046-c198716d73a0',
'share_proto': u'NFS'
}}
"""
if 'data' in data.keys():
data = data['data']
if (data.get('access_type', '?').lower() == 'ip' and
'access_state' in data.keys() and
'error' not in data.get('access_state', '?').lower() and
data.get('share_proto', '?').lower() == 'nfs'):
is_allow_operation = data['is_allow_operation']
export_location = data['export_locations'][0]
if is_allow_operation:
mount_share(export_location, data['access_to'])
else:
unmount_share(export_location, data['access_to'])
else:
print_with_time('Do nothing with above message.')
def execute(cmd):
try:
print_with_time('Executing following command: \n%s' % cmd)
cmd = cmd.split()
stdout, stderr = processutils.execute(*cmd)
if stderr:
print_with_time('Got error: %s' % stderr)
return stdout, stderr
except Exception as e:
print_with_time('Got following error: %s' % e)
return False, True
def is_share_mounted(mount_point):
mounts, stderr = execute('mount')
return mount_point in mounts
def rule_affects_me(ip_or_cidr):
if '/' in ip_or_cidr:
net = netaddr.IPNetwork(ip_or_cidr)
for my_ip in CONF.zaqar.expected_ip_addresses:
if netaddr.IPAddress(my_ip) in net:
return True
else:
for my_ip in CONF.zaqar.expected_ip_addresses:
if my_ip == ip_or_cidr:
return True
return False
def mount_share(export_location, access_to):
data = {
'mount_point': os.path.join(CONF.zaqar.mount_dir,
export_location.split('/')[-1]),
'export_location': export_location,
}
if (rule_affects_me(access_to) and
not is_share_mounted(data['mount_point'])):
print_with_time(
"Mounting '%(export_location)s' share to %(mount_point)s.")
execute('sudo mkdir -p %(mount_point)s' % data)
stdout, stderr = execute(
'sudo mount.nfs %(export_location)s %(mount_point)s' % data)
if stderr:
print_with_time("Mount operation failed.")
else:
print_with_time("Mount operation went OK.")
def unmount_share(export_location, access_to):
if rule_affects_me(access_to) and is_share_mounted(export_location):
print_with_time("Unmounting '%(export_location)s' share.")
stdout, stderr = execute('sudo umount %s' % export_location)
if stderr:
print_with_time("Unmount operation failed.")
else:
print_with_time("Unmount operation went OK.")
def main():
# Register other local modules
cur = os.path.dirname(__file__)
pathtest = os.path.join(cur)
sys.path.append(pathtest)
# Init configuration
CONF(sys.argv[1:], project="manila_notifier", version=1.0)
CONF.register_opts(opts, group="zaqar")
# Import common config and Zaqar client
import zaqarclientwrapper
# Handle SIGINT
signal.signal(signal.SIGINT, signal_handler)
# Run consumer
print_with_time("Consumer was successfully run.")
while(True):
messages = pop_zaqar_messages(
zaqarclientwrapper.ZAQARCLIENT, CONF.zaqar.zaqar_queues)
if not messages:
message = ("No new messages in '%s' queue[s] "
"found." % ','.join(CONF.zaqar.zaqar_queues))
else:
message = "Got following messages:"
print_with_time(message)
for message in messages:
message = parse_str_to_dict(message)
print_pretty_dict(message)
handle_message(message)
time.sleep(CONF.zaqar.consume_interval)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,86 @@
# Copyright (c) 2015 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from zaqarclient.queues import client as zaqar
zaqar_notification_opts = [
cfg.StrOpt(
"zaqar_username",
help="Username that should be used for init of zaqar client.",
),
cfg.StrOpt(
"zaqar_password",
secret=True,
help="Password for user specified in opt 'zaqar_username'.",
),
cfg.StrOpt(
"zaqar_project_name",
help=("Project/Tenant name that is owns user specified "
"in opt 'zaqar_username'."),
),
cfg.StrOpt(
"zaqar_auth_url",
default="http://127.0.0.1:35357/v2.0/",
help="Auth url to be used by Zaqar client.",
),
cfg.StrOpt(
"zaqar_region_name",
help="Name of the region that should be used. Optional.",
),
cfg.StrOpt(
"zaqar_service_type",
default="messaging",
help="Service type for Zaqar. Optional.",
),
cfg.StrOpt(
"zaqar_endpoint_type",
default="publicURL",
help="Type of endpoint to be used for init of Zaqar client. Optional.",
),
cfg.FloatOpt(
"zaqar_api_version",
default=1.1,
help="Version of Zaqar API to use. Optional.",
),
cfg.ListOpt(
"zaqar_queues",
default=["manila_notification_qeueue"],
help=("List of queues names to be used for sending Manila "
"notifications. Optional."),
),
]
CONF = cfg.CONF
CONF.register_opts(zaqar_notification_opts, group='zaqar')
ZAQARCLIENT = zaqar.Client(
version=CONF.zaqar.zaqar_api_version,
conf={
"auth_opts": {
"backend": "keystone",
"options": {
"os_username": CONF.zaqar.zaqar_username,
"os_password": CONF.zaqar.zaqar_password,
"os_project_name": CONF.zaqar.zaqar_project_name,
"os_auth_url": CONF.zaqar.zaqar_auth_url,
"os_region_name": CONF.zaqar.zaqar_region_name,
"os_service_type": CONF.zaqar.zaqar_service_type,
"os_endpoint_type": CONF.zaqar.zaqar_endpoint_type,
"insecure": True,
},
},
},
)