For systems with very large numbers of partitions, 1% dispersion coverage may simply be too much/take too long. This fix allows <1 values to be used for dispersion_coverage. DocImpact Change-Id: I5ed35b69754d55a410e66e658b3854de57c7666b
		
			
				
	
	
		
			161 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			161 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/env python
 | 
						|
# Copyright (c) 2010-2012 OpenStack, LLC.
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 | 
						|
# implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
 | 
						|
import traceback
 | 
						|
from ConfigParser import ConfigParser
 | 
						|
from cStringIO import StringIO
 | 
						|
from sys import exit, argv, stdout
 | 
						|
from time import time
 | 
						|
from uuid import uuid4
 | 
						|
 | 
						|
from eventlet import GreenPool, patcher, sleep
 | 
						|
from eventlet.pools import Pool
 | 
						|
 | 
						|
from swiftclient import Connection, get_auth
 | 
						|
from swift.common.ring import Ring
 | 
						|
from swift.common.utils import compute_eta, get_time_units
 | 
						|
 | 
						|
 | 
						|
def put_container(connpool, container, report):
 | 
						|
    global retries_done
 | 
						|
    try:
 | 
						|
        with connpool.item() as conn:
 | 
						|
            conn.put_container(container)
 | 
						|
            retries_done += conn.attempts - 1
 | 
						|
        if report:
 | 
						|
            report(True)
 | 
						|
    except Exception:
 | 
						|
        if report:
 | 
						|
            report(False)
 | 
						|
        raise
 | 
						|
 | 
						|
 | 
						|
def put_object(connpool, container, obj, report):
 | 
						|
    global retries_done
 | 
						|
    try:
 | 
						|
        with connpool.item() as conn:
 | 
						|
            conn.put_object(container, obj, StringIO(obj),
 | 
						|
                            headers={'x-object-meta-dispersion': obj})
 | 
						|
            retries_done += conn.attempts - 1
 | 
						|
        if report:
 | 
						|
            report(True)
 | 
						|
    except Exception:
 | 
						|
        if report:
 | 
						|
            report(False)
 | 
						|
        raise
 | 
						|
 | 
						|
 | 
						|
def report(success):
 | 
						|
    global begun, created, item_type, next_report, need_to_create, retries_done
 | 
						|
    if not success:
 | 
						|
        traceback.print_exc()
 | 
						|
        exit('Gave up due to error(s).')
 | 
						|
    created += 1
 | 
						|
    if time() < next_report:
 | 
						|
        return
 | 
						|
    next_report = time() + 5
 | 
						|
    eta, eta_unit = compute_eta(begun, created, need_to_create)
 | 
						|
    print '\r\x1B[KCreating %s: %d of %d, %d%s left, %d retries' % (item_type,
 | 
						|
          created, need_to_create, round(eta), eta_unit, retries_done),
 | 
						|
    stdout.flush()
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    global begun, created, item_type, next_report, need_to_create, retries_done
 | 
						|
    patcher.monkey_patch()
 | 
						|
 | 
						|
    conffile = '/etc/swift/dispersion.conf'
 | 
						|
    if len(argv) == 2:
 | 
						|
        conffile = argv[1]
 | 
						|
    elif len(argv) > 2:
 | 
						|
        exit('Syntax: %s [conffile]' % argv[0])
 | 
						|
    c = ConfigParser()
 | 
						|
    if not c.read(conffile):
 | 
						|
        exit('Unable to read config file: %s' % conffile)
 | 
						|
    conf = dict(c.items('dispersion'))
 | 
						|
    swift_dir = conf.get('swift_dir', '/etc/swift')
 | 
						|
    dispersion_coverage = float(conf.get('dispersion_coverage', 1))
 | 
						|
    retries = int(conf.get('retries', 5))
 | 
						|
    concurrency = int(conf.get('concurrency', 25))
 | 
						|
    endpoint_type = str(conf.get('endpoint_type', 'publicURL'))
 | 
						|
 | 
						|
    coropool = GreenPool(size=concurrency)
 | 
						|
    retries_done = 0
 | 
						|
 | 
						|
    os_options = {'endpoint_type': endpoint_type}
 | 
						|
 | 
						|
    url, token = get_auth(conf['auth_url'], conf['auth_user'],
 | 
						|
                          conf['auth_key'],
 | 
						|
                          auth_version=conf.get('auth_version', '1.0'),
 | 
						|
                          os_options=os_options)
 | 
						|
    account = url.rsplit('/', 1)[1]
 | 
						|
    connpool = Pool(max_size=concurrency)
 | 
						|
    connpool.create = lambda: Connection(conf['auth_url'],
 | 
						|
                                         conf['auth_user'], conf['auth_key'],
 | 
						|
                                         retries=retries,
 | 
						|
                                         preauthurl=url, preauthtoken=token,
 | 
						|
                                         os_options=os_options)
 | 
						|
 | 
						|
    container_ring = Ring(swift_dir, ring_name='container')
 | 
						|
    parts_left = dict((x, x) for x in xrange(container_ring.partition_count))
 | 
						|
    item_type = 'containers'
 | 
						|
    created = 0
 | 
						|
    retries_done = 0
 | 
						|
    need_to_create = need_to_queue = \
 | 
						|
        dispersion_coverage / 100.0 * container_ring.partition_count
 | 
						|
    begun = next_report = time()
 | 
						|
    next_report += 2
 | 
						|
    while need_to_queue >= 1:
 | 
						|
        container = 'dispersion_%s' % uuid4().hex
 | 
						|
        part, _junk = container_ring.get_nodes(account, container)
 | 
						|
        if part in parts_left:
 | 
						|
            coropool.spawn(put_container, connpool, container, report)
 | 
						|
            sleep()
 | 
						|
            del parts_left[part]
 | 
						|
            need_to_queue -= 1
 | 
						|
    coropool.waitall()
 | 
						|
    elapsed, elapsed_unit = get_time_units(time() - begun)
 | 
						|
    print '\r\x1B[KCreated %d containers for dispersion reporting, %d%s, %d ' \
 | 
						|
          'retries' % \
 | 
						|
          (need_to_create, round(elapsed), elapsed_unit, retries_done)
 | 
						|
    stdout.flush()
 | 
						|
 | 
						|
    container = 'dispersion_objects'
 | 
						|
    put_container(connpool, container, None)
 | 
						|
    object_ring = Ring(swift_dir, ring_name='object')
 | 
						|
    parts_left = dict((x, x) for x in xrange(object_ring.partition_count))
 | 
						|
    item_type = 'objects'
 | 
						|
    created = 0
 | 
						|
    retries_done = 0
 | 
						|
    need_to_create = need_to_queue = \
 | 
						|
        dispersion_coverage / 100.0 * object_ring.partition_count
 | 
						|
    begun = next_report = time()
 | 
						|
    next_report += 2
 | 
						|
    while need_to_queue >= 1:
 | 
						|
        obj = 'dispersion_%s' % uuid4().hex
 | 
						|
        part, _junk = object_ring.get_nodes(account, container, obj)
 | 
						|
        if part in parts_left:
 | 
						|
            coropool.spawn(put_object, connpool, container, obj, report)
 | 
						|
            sleep()
 | 
						|
            del parts_left[part]
 | 
						|
            need_to_queue -= 1
 | 
						|
    coropool.waitall()
 | 
						|
    elapsed, elapsed_unit = get_time_units(time() - begun)
 | 
						|
    print '\r\x1B[KCreated %d objects for dispersion reporting, %d%s, %d ' \
 | 
						|
          'retries' % \
 | 
						|
          (need_to_create, round(elapsed), elapsed_unit, retries_done)
 | 
						|
    stdout.flush()
 |