
191 lines
5.2 KiB

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
System-level utilities and helper functions.
import datetime
import sys
import uuid
from eventlet import event
from eventlet import greenthread
from eventlet import semaphore
from eventlet.green import subprocess
from heat.common import exception
PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
def chunkreadable(iter, chunk_size=65536):
Wrap a readable iterator with a reader yielding chunks of
a preferred size, otherwise leave iterator unchanged.
:param iter: an iter which may also be readable
:param chunk_size: maximum size of chunk
return chunkiter(iter, chunk_size) if hasattr(iter, 'read') else iter
def chunkiter(fp, chunk_size=65536):
Return an iterator to a file-like obj which yields fixed size chunks
:param fp: a file-like object
:param chunk_size: maximum size of chunk
while True:
chunk = fp.read(chunk_size)
if chunk:
yield chunk
def import_class(import_str):
"""Returns a class from a string including module and class"""
mod_str, _sep, class_str = import_str.rpartition('.')
return getattr(sys.modules[mod_str], class_str)
except (ImportError, ValueError, AttributeError), e:
raise exception.ImportFailure(import_str=import_str,
def import_object(import_str):
"""Returns an object including a module or module and class"""
return sys.modules[import_str]
except ImportError:
cls = import_class(import_str)
return cls()
def generate_uuid():
return str(uuid.uuid4())
def gen_uuid():
return uuid.uuid4()
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
"""Returns formatted utcnow."""
if not at:
at = utcnow()
return at.strftime(fmt)
def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
"""Turn a formatted time back into a datetime."""
return datetime.datetime.strptime(timestr, fmt)
def isotime(at=None):
"""Stringify time in ISO 8601 format"""
if not at:
at = datetime.datetime.utcnow()
str = at.strftime(ISO_TIME_FORMAT)
tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
str += ('Z' if tz == 'UTC' else tz)
return str
def parse_isotime(timestr):
"""Turn an iso formatted time back into a datetime."""
return iso8601.parse_date(timestr)
except (iso8601.ParseError, TypeError) as e:
raise ValueError(e.message)
def normalize_time(timestamp):
"""Normalize time in arbitrary timezone to UTC"""
offset = timestamp.utcoffset()
return timestamp.replace(tzinfo=None) - offset if offset else timestamp
def utcnow():
"""Overridable version of utils.utcnow."""
if utcnow.override_time:
return utcnow.override_time
return datetime.datetime.utcnow()
utcnow.override_time = None
class LoopingCallDone(Exception):
"""Exception to break out and stop a LoopingCall.
The poll-function passed to LoopingCall can raise this exception to
break out of the loop normally. This is somewhat analogous to
An optional return-value can be included as the argument to the exception;
this return-value will be returned by LoopingCall.wait()
def __init__(self, retvalue=True):
""":param retvalue: Value that LoopingCall.wait() should return."""
self.retvalue = retvalue
class LoopingCall(object):
def __init__(self, f=None, *args, **kw):
self.args = args
self.kw = kw
self.f = f
self._running = False
def start(self, interval, now=True):
self._running = True
done = event.Event()
def _inner():
if not now:
while self._running:
self.f(*self.args, **self.kw)
if not self._running:
except LoopingCallDone, e:
except Exception:
LOG.exception(_('in looping call'))
self.done = done
return self.done
def stop(self):
self._running = False
def wait(self):
return self.done.wait()