manila/contrib/share_driver_hooks/zaqar_notification_example_consumer.py
Igor Malinovskiy b1b723ad0b Add update_access() method to driver interface
- Add update_access() method to driver interface
- Move all code related to access operations to ShareInstanceAccess
class
- Statuses from individual access rules are now mapped to
share_instance's access_rules_status
- Add 'access_rules_status' field to share instance, which indicates
current status of applying access rules

APIImpact
Co-Authored-By: Rodrigo Barbieri <rodrigo.barbieri@fit-tecnologia.org.br>
Co-Authored-By: Tiago Pasqualini da Silva <tiago.pasqualini@gmail.com>
Implements: bp new-share-access-driver-interface

Change-Id: Iff1ec2e3176a46e9f6bd383b38ffc5d838aa8bb8
2016-02-05 10:41:51 -02:00

243 lines
7.4 KiB
Python
Executable File

#!/usr/bin/env python
#
# Copyright (c) 2015 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import os
import pprint
import signal
import sys
import time
import netaddr
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_utils import timeutils
import six
opts = [
cfg.IntOpt(
"consume_interval",
default=5,
deprecated_name="sleep_between_consume_attempts",
help=("Time that script will sleep between requests for consuming "
"Zaqar messages in seconds."),
),
cfg.StrOpt(
"mount_dir",
default="/tmp",
help="Directory that will contain all mounted shares."
),
cfg.ListOpt(
"expected_ip_addresses",
default=[],
help=("List of IP addresses that are expected to be found in access "
"rules to trigger [un]mount operation for a share.")
),
]
CONF = cfg.CONF
def print_with_time(data):
time = six.text_type(timeutils.utcnow())
print(time + " " + six.text_type(data))
def print_pretty_dict(d):
pprint.pprint(d)
def pop_zaqar_messages(client, queues_names):
if not isinstance(queues_names, (list, set, tuple)):
queues_names = (queues_names, )
try:
user = client.conf['auth_opts']['options']['os_username']
project = client.conf['auth_opts']['options']['os_project_name']
messages = []
for queue_name in queues_names:
queue = client.queue(queue_name)
messages.extend([six.text_type(m.body) for m in queue.pop()])
print_with_time(
"Received %(len)s message[s] from '%(q)s' "
"queue using '%(u)s' user and '%(p)s' project." % {
'len': len(messages),
'q': queue_name,
'u': user,
'p': project,
}
)
return messages
except Exception as e:
print_with_time("Caught exception - %s" % e)
return []
def signal_handler(signal, frame):
print("")
print_with_time("Ctrl+C was pressed. Shutting down consumer.")
sys.exit(0)
def parse_str_to_dict(string):
if not isinstance(string, six.string_types):
return string
result = eval(string)
return result
def handle_message(data):
"""Handles consumed message.
Expected structure of a message is following:
{'data': {
'access_rules': [
{
'access_id': u'b28268b9-36c6-40d3-a485-22534077328f',
'access_instance_id':
u'd137b2cb-f549-4141-9dd7-36b2789fb973',
'access_level': u'rw',
'access_state': u'active',
'access_to': u'7.7.7.7',
'access_type': u'ip',
}
],
'availability_zone': u'nova',
'export_locations': [u'127.0.0.1:/path/to/nfs/share'],
'is_allow_operation': True,
'share_id': u'053eae9a-726f-4f7e-8502-49d7b1adf290',
'share_instance_id': u'dc33e554-e0b9-40f5-9046-c198716d73a0',
'share_proto': u'NFS'
}}
"""
if 'data' in data.keys():
data = data['data']
valid_access = (
'access_rules' in data and len(data['access_rules']) == 1 and
data['access_rules'][0].get('access_type', '?').lower() == 'ip' and
data.get('share_proto', '?').lower() == 'nfs'
)
if valid_access:
is_allow_operation = data['is_allow_operation']
export_location = data['export_locations'][0]
if is_allow_operation:
mount_share(export_location, data['access_to'])
else:
unmount_share(export_location, data['access_to'])
else:
print_with_time('Do nothing with above message.')
def execute(cmd):
try:
print_with_time('Executing following command: \n%s' % cmd)
cmd = cmd.split()
stdout, stderr = processutils.execute(*cmd)
if stderr:
print_with_time('Got error: %s' % stderr)
return stdout, stderr
except Exception as e:
print_with_time('Got following error: %s' % e)
return False, True
def is_share_mounted(mount_point):
mounts, stderr = execute('mount')
return mount_point in mounts
def rule_affects_me(ip_or_cidr):
if '/' in ip_or_cidr:
net = netaddr.IPNetwork(ip_or_cidr)
for my_ip in CONF.zaqar.expected_ip_addresses:
if netaddr.IPAddress(my_ip) in net:
return True
else:
for my_ip in CONF.zaqar.expected_ip_addresses:
if my_ip == ip_or_cidr:
return True
return False
def mount_share(export_location, access_to):
data = {
'mount_point': os.path.join(CONF.zaqar.mount_dir,
export_location.split('/')[-1]),
'export_location': export_location,
}
if (rule_affects_me(access_to) and
not is_share_mounted(data['mount_point'])):
print_with_time(
"Mounting '%(export_location)s' share to %(mount_point)s.")
execute('sudo mkdir -p %(mount_point)s' % data)
stdout, stderr = execute(
'sudo mount.nfs %(export_location)s %(mount_point)s' % data)
if stderr:
print_with_time("Mount operation failed.")
else:
print_with_time("Mount operation went OK.")
def unmount_share(export_location, access_to):
if rule_affects_me(access_to) and is_share_mounted(export_location):
print_with_time("Unmounting '%(export_location)s' share.")
stdout, stderr = execute('sudo umount %s' % export_location)
if stderr:
print_with_time("Unmount operation failed.")
else:
print_with_time("Unmount operation went OK.")
def main():
# Register other local modules
cur = os.path.dirname(__file__)
pathtest = os.path.join(cur)
sys.path.append(pathtest)
# Init configuration
CONF(sys.argv[1:], project="manila_notifier", version=1.0)
CONF.register_opts(opts, group="zaqar")
# Import common config and Zaqar client
import zaqarclientwrapper
# Handle SIGINT
signal.signal(signal.SIGINT, signal_handler)
# Run consumer
print_with_time("Consumer was successfully run.")
while(True):
messages = pop_zaqar_messages(
zaqarclientwrapper.ZAQARCLIENT, CONF.zaqar.zaqar_queues)
if not messages:
message = ("No new messages in '%s' queue[s] "
"found." % ','.join(CONF.zaqar.zaqar_queues))
else:
message = "Got following messages:"
print_with_time(message)
for message in messages:
message = parse_str_to_dict(message)
print_pretty_dict(message)
handle_message(message)
time.sleep(CONF.zaqar.consume_interval)
if __name__ == '__main__':
main()