Renaming Artifice to Distil

Lots of general rename changes, and a few pieces of minor tidy up.

Change-Id: Ia88b75af0e2d294cfc57164ac42155234d8ba71a
This commit is contained in:
adriant
2014-06-12 12:52:18 +12:00
parent 7225d81a20
commit fcfa0f077d
29 changed files with 51 additions and 67 deletions

0
distil/__init__.py Normal file
View File

0
distil/api/__init__.py Normal file
View File

70
distil/api/helpers.py Normal file
View File

@@ -0,0 +1,70 @@
from decorator import decorator
import flask
import itertools
import json
from distil.models import Tenant
def _validate(data, *args, **kwargs):
for key in itertools.chain(args, kwargs.keys()):
if not key in data:
flask.abort(400, json.dumps({'error': 'missing parameter',
'param': key}))
for key, val in kwargs.iteritems():
flask.abort(400, json.dumps({'error': 'validation failed',
'param': key}))
def must(*args, **kwargs):
"""
Asserts that a given set of keys are present in the request parameters.
Also allows for keyword args to handle validation.
"""
def tester(func):
def funky(*iargs, **ikwargs):
_validate(flask.request.params, *args, **kwargs)
return func(*iargs, **ikwargs)
return decorator(funky, func)
return tester
@decorator
def returns_json(func, *args, **kwargs):
status, content = func(*args, **kwargs)
response = flask.make_response(
json.dumps(content), status)
response.headers['Content-type'] = 'application/json'
return response
def json_must(*args, **kwargs):
"""Implements a simple validation system to allow for the required
keys to be detected on a given callable."""
def unpack(func):
def dejson(f, *iargs):
if (flask.request.headers.get('content-type', '') !=
"application/json"):
flask.abort(400, json.dumps(
{"error": "must be in JSON format"})
)
# todo -- parse_float was handled specially
_validate(flask.request.json, *args, **kwargs)
return func(*iargs)
return decorator(dejson, func)
return unpack
def validate_tenant_id(tenant_id, session):
"""Tenant ID validation that check that the id you passed is valid,
and that a tenant with this ID exists.
- returns tenant query, or a tuple if validation failure."""
if isinstance(tenant_id, unicode):
tenant_query = session.query(Tenant).\
filter(Tenant.id == tenant_id)
if tenant_query.count() == 0:
return 400, {"errors": ["No tenant matching ID found."]}
elif tenant_id is not None:
return 400, {"error": ["tenant must be a unicode string."]}
else:
return 400, {"missing parameter": {"tenant": "Tenant id."}}
return tenant_query[0]

483
distil/api/web.py Normal file
View File

@@ -0,0 +1,483 @@
import flask
from flask import Flask, Blueprint
from distil import database, config
from distil.constants import iso_time, iso_date, dawn_of_time
from distil.transformers import active_transformers
from distil.rates import RatesFile
from distil.models import SalesOrder, _Last_Run
from distil.helpers import convert_to
from distil.interface import Interface, timed
import sqlalchemy
from sqlalchemy import create_engine, func
from sqlalchemy.orm import scoped_session, create_session
from sqlalchemy.pool import NullPool
from datetime import datetime, timedelta
import json
import logging as log
from .helpers import returns_json, json_must, validate_tenant_id
engine = None
Session = None
app = Blueprint("main", __name__)
DEFAULT_TIMEZONE = "Pacific/Auckland"
def get_app(conf):
actual_app = Flask(__name__)
actual_app.register_blueprint(app, url_prefix="/")
config.setup_config(conf)
global engine
engine = create_engine(config.main["database_uri"], poolclass=NullPool)
global Session
Session = scoped_session(lambda: create_session(bind=engine))
if config.main.get("timezone"):
global DEFAULT_TIMEZONE
DEFAULT_TIMEZONE = config.main["timezone"]
log.basicConfig(filename=config.main["log_file"],
level=log.INFO,
format='%(asctime)s %(message)s')
log.info("Billing API started.")
return actual_app
def generate_windows(start, end):
"""Generator for 1 hour windows in a given range."""
window_size = timedelta(hours=1)
while start + window_size <= end:
window_end = start + window_size
yield start, window_end
start = window_end
def collect_usage(tenant, db, session, resp, end):
"""Collects usage for a given tenant from when they were last collected,
up to the given end, and breaks the range into one hour windows."""
run_once = False
timestamp = datetime.utcnow()
session.begin(subtransactions=True)
log.info('collect_usage for %s %s' % (tenant.id, tenant.name))
db_tenant = db.insert_tenant(tenant.id, tenant.name,
tenant.description, timestamp)
start = db_tenant.last_collected
session.commit()
trust_sources = set(config.main.get('trust_sources', []))
for window_start, window_end in generate_windows(start, end):
with timed("new transaction"):
session.begin(subtransactions=True)
try:
log.info("%s %s slice %s %s" % (tenant.id, tenant.name,
window_start, window_end))
mappings = config.collection['meter_mappings']
for meter_name, meter_info in mappings.items():
usage = tenant.usage(meter_name, window_start, window_end)
usage_by_resource = {}
transformer = active_transformers[meter_info['transformer']]()
with timed("filter and group by resource"):
for u in usage:
# the user can make their own samples, including those
# that would collide with what we care about for
# billing.
# if we have a list of trust sources configured, then
# discard everything not matching.
if trust_sources and u['source'] not in trust_sources:
log.warning('ignoring untrusted usage sample ' +
'from source `%s`' % u['source'])
continue
resource_id = u['resource_id']
entries = usage_by_resource.setdefault(resource_id, [])
entries.append(u)
with timed("apply transformer + insert"):
for res, entries in usage_by_resource.items():
# apply the transformer.
transformed = transformer.transform_usage(
meter_name, entries, window_start, window_end)
db.insert_resource(tenant.id, res, meter_info['type'],
timestamp, entries[-1])
db.insert_usage(tenant.id, res, transformed,
meter_info['unit'],
window_start, window_end, timestamp)
with timed("commit insert"):
# update the timestamp for the tenant so we won't examine this
# timespan again.
db_tenant.last_collected = window_end
session.add(db_tenant)
session.commit()
resp["tenants"].append(
{"id": tenant.id,
"updated": True,
"start": window_start.strftime(iso_time),
"end": window_end.strftime(iso_time)
}
)
run_once = True
except sqlalchemy.exc.IntegrityError:
# this is fine.
session.rollback()
resp["tenants"].append(
{"id": tenant.id,
"updated": False,
"error": "Integrity error",
"start": window_start.strftime(iso_time),
"end": window_end.strftime(iso_time)
}
)
resp["errors"] += 1
log.warning("IntegrityError for %s %s in window: %s - %s " %
(tenant.name, tenant.id,
window_start.strftime(iso_time),
window_end.strftime(iso_time)))
return run_once
return run_once
@app.route("collect_usage", methods=["POST"])
def run_usage_collection():
"""Run usage collection on all tenants present in Keystone."""
try:
log.info("Usage collection run started.")
session = Session()
interface = Interface()
db = database.Database(session)
end = datetime.utcnow().\
replace(minute=0, second=0, microsecond=0)
tenants = interface.tenants
resp = {"tenants": [], "errors": 0}
run_once = False
for tenant in tenants:
if collect_usage(tenant, db, session, resp, end):
run_once = True
if(run_once):
session.begin()
last_run = session.query(_Last_Run)
if last_run.count() == 0:
last_run = _Last_Run(last_run=end)
session.add(last_run)
session.commit()
else:
last_run[0].last_run = end
session.commit()
session.close()
log.info("Usage collection run complete.")
return json.dumps(resp)
except Exception as e:
log.critical('Exception escaped!', type(e), e)
import traceback
traceback.print_exc()
def build_tenant_dict(tenant, entries, db):
"""Builds a dict structure for a given tenant."""
tenant_dict = {}
tenant_dict = {'name': tenant.name, 'tenant_id': tenant.id,
'resources': {}}
for entry in entries:
service = {'name': entry.service, 'volume': entry.volume,
'unit': entry.unit}
if (entry.resource_id not in tenant_dict['resources']):
resource = db.get_resource_metadata(entry.resource_id)
resource['services'] = [service]
tenant_dict['resources'][entry.resource_id] = resource
else:
resource = tenant_dict['resources'][entry.resource_id]
resource['services'].append(service)
return tenant_dict
def add_costs_for_tenant(tenant, RatesManager):
"""Adds cost values to services using the given rates manager."""
tenant_total = 0
for resource in tenant['resources'].values():
resource_total = 0
for service in resource['services']:
try:
rate = RatesManager.rate(service['name'])
except KeyError:
# no rate exists for this service
service['cost'] = "0"
service['volume'] = "unknown unit conversion"
service['unit'] = "unknown"
service['rate'] = "missing rate"
continue
volume = convert_to(service['volume'],
service['unit'],
rate['unit'])
# round to 2dp so in dollars.
cost = round(volume * rate['rate'], 2)
service['cost'] = str(cost)
service['volume'] = str(volume)
service['unit'] = rate['unit']
service['rate'] = str(rate['rate'])
resource_total += cost
resource['total_cost'] = str(resource_total)
tenant_total += resource_total
tenant['total_cost'] = str(tenant_total)
return tenant
def generate_sales_order(draft, tenant_id, end):
"""Generates a sales order dict, and unless draft is true,
creates a database entry for sales_order."""
session = Session()
db = database.Database(session)
valid_tenant = validate_tenant_id(tenant_id, session)
if isinstance(valid_tenant, tuple):
return valid_tenant
rates = RatesFile(config.rates_config)
# Get the last sales order for this tenant, to establish
# the proper ranging
start = session.query(func.max(SalesOrder.end).label('end')).\
filter(SalesOrder.tenant_id == tenant_id).first().end
if not start:
start = dawn_of_time
# these coditionals need work, also some way to
# ensure all given timedate values are in UTC?
if end <= start:
return 400, {"errors": ["end date must be greater than " +
"the end of the last sales order range."]}
if end > datetime.utcnow():
return 400, {"errors": ["end date cannot be a future date."]}
usage = db.usage(start, end, tenant_id)
session.begin()
if not draft:
order = SalesOrder(tenant_id=tenant_id, start=start, end=end)
session.add(order)
try:
# Commit the record before we generate the bill, to mark this as a
# billed region of data. Avoids race conditions by marking a tenant
# BEFORE we start to generate the data for it.
session.commit()
# Transform the query result into a billable dict.
tenant_dict = build_tenant_dict(valid_tenant, usage, db)
tenant_dict = add_costs_for_tenant(tenant_dict, rates)
# add sales order range:
tenant_dict['start'] = str(start)
tenant_dict['end'] = str(end)
session.close()
if not draft:
log.info("Sales Order #%s Generated for %s in range: %s - %s" %
(order.id, tenant_id, start, end))
return 200, tenant_dict
except sqlalchemy.exc.IntegrityError:
session.rollback()
session.close()
log.warning("IntegrityError creating sales-order for " +
"%s %s in range: %s - %s " %
(valid_tenant.name, valid_tenant.id, start, end))
return 400, {"id": tenant_id,
"error": "IntegrityError, existing sales_order overlap."}
def regenerate_sales_order(tenant_id, target):
"""Finds a sales order entry nearest to the target,
and returns a salesorder dict based on the entry."""
session = Session()
db = database.Database(session)
rates = RatesFile(config.rates_config)
valid_tenant = validate_tenant_id(tenant_id, session)
if isinstance(valid_tenant, tuple):
return valid_tenant
try:
sales_order = db.get_sales_orders(tenant_id, target, target)[0]
except IndexError:
return 400, {"errors": ["Given date not in existing sales orders."]}
usage = db.usage(sales_order.start, sales_order.end, tenant_id)
# Transform the query result into a billable dict.
tenant_dict = build_tenant_dict(valid_tenant, usage, db)
tenant_dict = add_costs_for_tenant(tenant_dict, rates)
# add sales order range:
tenant_dict['start'] = str(sales_order.start)
tenant_dict['end'] = str(sales_order.end)
return 200, tenant_dict
def regenerate_sales_order_range(tenant_id, start, end):
"""For all sales orders in a given range, generate sales order dicts,
and return them."""
session = Session()
db = database.Database(session)
rates = RatesFile(config.rates_config)
valid_tenant = validate_tenant_id(tenant_id, session)
if isinstance(valid_tenant, tuple):
return valid_tenant
sales_orders = db.get_sales_orders(tenant_id, start, end)
tenants = []
for sales_order in sales_orders:
usage = db.usage(sales_order.start, sales_order.end, tenant_id)
# Transform the query result into a billable dict.
tenant_dict = build_tenant_dict(valid_tenant, usage, db)
tenant_dict = add_costs_for_tenant(tenant_dict, rates)
# add sales order range:
tenant_dict['start'] = str(sales_order.start)
tenant_dict['end'] = str(sales_order.end)
tenants.append(tenant_dict)
return 200, tenants
@app.route("sales_order", methods=["POST"])
@json_must()
@returns_json
def run_sales_order_generation():
"""Generates a sales order for the given tenant.
-end: a given end date, or uses default"""
tenant_id = flask.request.json.get("tenant", None)
end = flask.request.json.get("end", None)
if not end:
# Today, the beginning of.
end = datetime.utcnow().\
replace(hour=0, minute=0, second=0, microsecond=0)
else:
try:
end = datetime.strptime(end, iso_date)
except ValueError:
return 400, {"errors": ["'end' date given needs to be in format:" +
" y-m-d"]}
return generate_sales_order(False, tenant_id, end)
@app.route("sales_draft", methods=["POST"])
@json_must()
@returns_json
def run_sales_draft_generation():
"""Generates a sales draft for the given tenant.
-end: a given end datetime, or uses default"""
tenant_id = flask.request.json.get("tenant", None)
end = flask.request.json.get("end", None)
if not end:
end = datetime.utcnow()
else:
try:
end = datetime.strptime(end, iso_date)
except ValueError:
try:
end = datetime.strptime(end, iso_time)
except ValueError:
return 400, {
"errors": ["'end' date given needs to be in format: " +
"y-m-d, or y-m-dTH:M:S"]}
return generate_sales_order(True, tenant_id, end)
@app.route("sales_historic", methods=["POST"])
@json_must()
@returns_json
def run_sales_historic_generation():
"""Returns the sales order that intersects with the given target date.
-target: a given target date"""
tenant_id = flask.request.json.get("tenant", None)
target = flask.request.json.get("date", None)
if target is not None:
try:
target = datetime.strptime(target, iso_date)
except ValueError:
return 400, {"errors": ["date given needs to be in format: " +
"y-m-d"]}
else:
return 400, {"missing parameter": {"date": "target date in format: " +
"y-m-d"}}
return regenerate_sales_order(tenant_id, target)
@app.route("sales_range", methods=["POST"])
@json_must()
@returns_json
def run_sales_historic_range_generation():
"""Returns the sales orders that intersect with the given date range.
-start: a given start for the range.
-end: a given end for the range, defaults to now."""
tenant_id = flask.request.json.get("tenant", None)
start = flask.request.json.get("start", None)
end = flask.request.json.get("end", None)
try:
if start is not None:
start = datetime.strptime(start, iso_date)
else:
return 400, {"missing parameter": {"start": "start date" +
" in format: y-m-d"}}
if end is not None:
end = datetime.strptime(end, iso_date)
else:
end = datetime.utcnow()
except ValueError:
return 400, {"errors": ["dates given need to be in format: " +
"y-m-d"]}
return regenerate_sales_order_range(tenant_id, start, end)
if __name__ == '__main__':
pass

40
distil/auth.py Normal file
View File

@@ -0,0 +1,40 @@
import requests
import json
import urllib
import config
# Provides authentication against Openstack
from keystoneclient.v2_0 import client as KeystoneClient
class NotFound(BaseException):
pass
class Keystone(KeystoneClient.Client):
def tenant_by_name(self, name):
authenticator = self.auth_url
url = "%(url)s/tenants?%(query)s" % {
"url": authenticator,
"query": urllib.urlencode({"name": name})
}
r = requests.get(url, headers={
"X-Auth-Token": self.auth_token,
"Content-Type": "application/json"
})
if r.ok:
data = json.loads(r.text)
assert data
return data
else:
if r.status_code == 404:
# couldn't find it
raise NotFound
def get_ceilometer_endpoint(self):
endpoint = self.service_catalog.url_for(
service_type="metering",
endpoint_type="adminURL",
region_name=config.main['region'])
return endpoint

20
distil/config.py Normal file
View File

@@ -0,0 +1,20 @@
# This is simply a namespace for global config storage
main = None
rates_config = None
auth = None
collection = None
transformers = None
def setup_config(conf):
global main
main = conf['main']
global rates_config
rates_config = conf['rates_config']
global auth
auth = conf['auth']
global collection
collection = conf['collection']
global transformers
transformers = conf['transformers']

33
distil/constants.py Normal file
View File

@@ -0,0 +1,33 @@
from datetime import datetime
# Date format Ceilometer uses
# 2013-07-03T13:34:17
# which is, as an strftime:
# timestamp = datetime.strptime(res["timestamp"], "%Y-%m-%dT%H:%M:%S.%f")
# or
# timestamp = datetime.strptime(res["timestamp"], "%Y-%m-%dT%H:%M:%S")
# Most of the time we use date_format
date_format = "%Y-%m-%dT%H:%M:%S"
# Sometimes things also have milliseconds, so we look for that too.
# Because why not be annoying in all the ways?
other_date_format = "%Y-%m-%dT%H:%M:%S.%f"
# Some useful constants
iso_time = "%Y-%m-%dT%H:%M:%S"
iso_date = "%Y-%m-%d"
dawn_of_time = datetime(2014, 4, 1)
# VM states:
states = {'active': 1,
'building': 2,
'paused': 3,
'suspended': 4,
'stopped': 5,
'rescued': 6,
'resized': 7,
'soft_deleted': 8,
'deleted': 9,
'error': 10,
'shelved': 11,
'shelved_offloaded': 12}

124
distil/database.py Normal file
View File

@@ -0,0 +1,124 @@
from sqlalchemy import func
from .models import Resource, UsageEntry, Tenant, SalesOrder, _Last_Run
from distil.constants import dawn_of_time
from datetime import timedelta
import json
import config
import logging as log
class Database(object):
def __init__(self, session):
self.session = session
def insert_tenant(self, tenant_id, tenant_name, metadata, timestamp):
"""If a tenant exists does nothing,
and if it doesn't, creates and inserts it."""
# Have we seen this tenant before?
query = self.session.query(Tenant).\
filter(Tenant.id == tenant_id)
if query.count() == 0:
last_run = self.session.query(_Last_Run)
if last_run.count() == 0:
start = dawn_of_time
else:
# start equals the last run, minus an hour
# to ensure no data is missed
start = last_run[0].last_run - timedelta(hours=1)
tenant = Tenant(id=tenant_id,
info=metadata,
name=tenant_name,
created=timestamp,
last_collected=start
)
self.session.add(tenant)
self.session.flush() # can't assume deferred constraints.
return tenant
else:
return query[0]
def insert_resource(self, tenant_id, resource_id, resource_type,
timestamp, entry):
"""If a given resource does not exist, creates it,
otherwise merges the metadata with the new entry."""
query = self.session.query(Resource).\
filter(Resource.id == resource_id,
Resource.tenant_id == tenant_id)
if query.count() == 0:
info = self.merge_resource_metadata({'type': resource_type}, entry)
self.session.add(Resource(
id=resource_id,
info=json.dumps(info),
tenant_id=tenant_id,
created=timestamp))
self.session.flush() # can't assume deferred constraints.
else:
md_dict = json.loads(query[0].info)
md_dict = self.merge_resource_metadata(md_dict, entry)
query[0].info = json.dumps(md_dict)
def insert_usage(self, tenant_id, resource_id, entries, unit,
start, end, timestamp):
"""Inserts all given entries into the database."""
for service, volume in entries.items():
entry = UsageEntry(
service=service,
volume=volume,
unit=unit,
resource_id=resource_id,
tenant_id=tenant_id,
start=start,
end=end,
created=timestamp)
self.session.add(entry)
log.debug(entry)
def usage(self, start, end, tenant_id):
"""Returns a query of usage entries for a given tenant,
in the given range.
start, end: define the range to query
tenant: a tenant entry (tenant_id for now)"""
# build a query set in the format:
# tenant_id | resource_id | service | unit | sum(volume)
query = self.session.query(UsageEntry.tenant_id,
UsageEntry.resource_id,
UsageEntry.service,
UsageEntry.unit,
func.sum(UsageEntry.volume).label("volume")).\
filter(UsageEntry.start >= start, UsageEntry.end <= end).\
filter(UsageEntry.tenant_id == tenant_id).\
group_by(UsageEntry.tenant_id, UsageEntry.resource_id,
UsageEntry.service, UsageEntry.unit)
return query
def get_resource_metadata(self, resource_id):
"""Gets the metadata for a resource and loads it into a dict."""
info = self.session.query(Resource.info).\
filter(Resource.id == resource_id)
return json.loads(info[0].info)
def get_sales_orders(self, tenant_id, start, end):
"""Returns a query with all sales orders
for a tenant in the given range."""
query = self.session.query(SalesOrder).\
filter(SalesOrder.start <= end, SalesOrder.end >= start).\
filter(SalesOrder.tenant_id == tenant_id)
return query
def merge_resource_metadata(self, md_dict, entry):
"""Strips metadata from the entry as defined in the config,
and merges it with the given metadata dict."""
fields = config.collection['metadata_def'].get(md_dict['type'], {})
for field, sources in fields.iteritems():
for i, source in enumerate(sources):
try:
md_dict[field] = entry['resource_metadata'][sources[0]]
break
except KeyError:
# Just means we haven't found the right value yet.
pass
return md_dict

45
distil/helpers.py Normal file
View File

@@ -0,0 +1,45 @@
from novaclient.v1_1 import client
from decimal import Decimal
import config
import math
cache = {}
def flavor_name(f_id):
"""Grabs the correct flavor name from Nova given the correct ID."""
if f_id not in cache:
nova = client.Client(
config.auth['username'],
config.auth['password'],
config.auth['default_tenant'],
config.auth['end_point'],
service_type="compute",
insecure=config.auth['insecure'])
cache[f_id] = nova.flavors.get(f_id).name
return cache[f_id]
def to_gigabytes_from_bytes(value):
"""From Bytes, unrounded."""
return ((value / Decimal(1024)) / Decimal(1024)) / Decimal(1024)
def to_hours_from_seconds(value):
"""From seconds to rounded hours"""
return Decimal(math.ceil((value / Decimal(60)) / Decimal(60)))
conversions = {'byte': {'gigabyte': to_gigabytes_from_bytes},
'second': {'hour': to_hours_from_seconds}}
def convert_to(value, from_unit, to_unit):
"""Converts a given value to the given unit.
Assumes that the value is in the lowest unit form,
of the given unit (seconds or bytes).
e.g. if the unit is gigabyte we assume the value is in bytes"""
if from_unit == to_unit:
return value
return conversions[from_unit][to_unit](value)

31
distil/initdb.py Normal file
View File

@@ -0,0 +1,31 @@
from models import Base, __VERSION__
from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool
def provision(engine):
Base.metadata.create_all(bind=engine)
if __name__ == '__main__':
import argparse
a = argparse.ArgumentParser()
a.add_argument("--host", "--host")
a.add_argument("-p", "--port")
a.add_argument("-u", "--user")
a.add_argument("-d", "--database")
a.add_argument("-P", "--provider")
a.add_argument("-w", "--password")
args = a.parse_args()
conn_string = "{provider}://{user}:{password}@{host}/{database}".format(
host=args.host,
port=args.port,
provider=args.provider,
user=args.user,
password=args.password,
database=args.database)
engine = create_engine(conn_string, poolclass=NullPool)
provision(engine)

113
distil/interface.py Normal file
View File

@@ -0,0 +1,113 @@
import requests
import json
import auth
from constants import date_format
import config
from datetime import timedelta, datetime
from contextlib import contextmanager
import logging as log
@contextmanager
def timed(desc):
start = datetime.utcnow()
yield
end = datetime.utcnow()
log.debug("%s: %s" % (desc, end - start))
class Interface(object):
"""Interface for talking to openstack components."""
def __init__(self):
self.session = requests.Session()
# This is the Keystone client connection, which provides our
# OpenStack authentication
self.auth = auth.Keystone(
username=config.auth["username"],
password=config.auth["password"],
tenant_name=config.auth["default_tenant"],
auth_url=config.auth["end_point"],
insecure=config.auth["insecure"]
)
@property
def tenants(self):
"""All the tenants as known by keystone."""
with timed("fetch tenant list from keystone"):
_tenants = self.auth.tenants.list()
tenants = []
for tenant in _tenants:
# if this tenant is in the ignore_tenants, then just pretend
# it doesnt exist at all.
if tenant.name not in config.main.get('ignore_tenants', []):
t = Tenant(tenant, self)
tenants.append(t)
else:
log.debug("Ignored tenant %s (%s) due to config." %
(tenant.id, tenant.name))
return tenants
class InterfaceException(Exception):
pass
window_leadin = timedelta(minutes=10)
def add_dates(start, end):
return [
{
"field": "timestamp",
"op": "ge",
"value": start.strftime(date_format)
},
{
"field": "timestamp",
"op": "lt",
"value": end.strftime(date_format)
}
]
class Tenant(object):
"""A wrapper object for the tenant recieved from keystone."""
def __init__(self, tenant, conn):
self.tenant = tenant
self.conn = conn # the Interface object that produced us.
@property
def id(self):
return self.tenant.id
@property
def name(self):
return self.tenant.name
@property
def description(self):
return self.tenant.description
def usage(self, meter_name, start, end):
"""Queries ceilometer for all the entries in a given range,
for a given meter, from this tenant."""
fields = [{'field': 'project_id', 'op': 'eq', 'value': self.tenant.id}]
fields.extend(add_dates(start - window_leadin, end))
with timed('fetch global usage for meter %s' % meter_name):
endpoint = self.conn.auth.get_ceilometer_endpoint()
r = self.conn.session.get(
'%s/v2/meters/%s' % (endpoint, meter_name),
headers={
"X-Auth-Token": self.conn.auth.auth_token,
"Content-Type": "application/json"
},
data=json.dumps({'q': fields}))
if r.status_code == 200:
return json.loads(r.text)
else:
raise InterfaceException('%d %s' % (r.status_code, r.text))

304
distil/models/__init__.py Normal file
View File

@@ -0,0 +1,304 @@
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Text, DateTime, Numeric, ForeignKey
from sqlalchemy import event, DDL, String, Integer
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
from sqlalchemy.orm import relationship
from sqlalchemy.schema import ForeignKeyConstraint
# Version digit.
__VERSION__ = 1.0
Base = declarative_base()
class _Version(Base):
"""
A model that knows what version we are, stored in the DB.
"""
__tablename__ = "distil_database_version"
id = Column(String(10), primary_key=True)
class _Last_Run(Base):
"""Model to store time of last completed usage run."""
__tablename__ = "distil_last_run"
id = Column(Integer, primary_key=True)
last_run = Column(DateTime, nullable=False)
class Resource(Base):
"""Database model for storing metadata associated with a resource."""
__tablename__ = 'resources'
id = Column(String(100), primary_key=True)
tenant_id = Column(String(100), ForeignKey("tenants.id"), primary_key=True)
info = Column(Text)
created = Column(DateTime, nullable=False)
class UsageEntry(Base):
"""Simplified data store of usage information for a given service,
in a resource, in a tenant. Similar to ceilometer datastore,
but stores local transformed data."""
__tablename__ = 'usage'
# Service is things like incoming vs. outgoing, as well as instance
# flavour
service = Column(String(100), primary_key=True)
unit = Column(String(100))
volume = Column(Numeric(precision=20, scale=2), nullable=False)
resource_id = Column(String(100), primary_key=True)
tenant_id = Column(String(100), primary_key=True)
start = Column(DateTime, nullable=False, primary_key=True)
end = Column(DateTime, nullable=False, primary_key=True)
created = Column(DateTime, nullable=False)
resource = relationship(Resource,
primaryjoin=(resource_id == Resource.id))
tenant = relationship(Resource,
primaryjoin=(tenant_id == Resource.tenant_id))
__table_args__ = (ForeignKeyConstraint(
["resource_id", "tenant_id"],
["resources.id", "resources.tenant_id"],
name="fk_resource_constraint"
),
)
@hybrid_property
def length(self):
return self.end - self.start
@hybrid_method
def intersects(self, other):
return (self.start <= other.end and other.start <= self.end)
def __str__(self):
return '<UsageEntry {tenant_id=%s resource_id=%s service=%s start=%s end=%s volume=%s}>' % (
self.tenant_id, self.resource_id, self.service,
self.start, self.end, self.volume)
class Tenant(Base):
"""Model for storage of metadata related to a tenant."""
__tablename__ = 'tenants'
# ID is a uuid
id = Column(String(100), primary_key=True, nullable=False)
name = Column(Text, nullable=False)
info = Column(Text)
created = Column(DateTime, nullable=False)
last_collected = Column(DateTime, nullable=False)
resources = relationship(Resource, backref="tenant")
class SalesOrder(Base):
"""Historic billing periods so that tenants
cannot be rebilled accidentally."""
__tablename__ = 'sales_orders'
id = Column(Integer, primary_key=True)
tenant_id = Column(
String(100),
ForeignKey("tenants.id"),
primary_key=True)
start = Column(DateTime, nullable=False, primary_key=True)
end = Column(DateTime, nullable=False, primary_key=True)
tenant = relationship("Tenant")
@hybrid_property
def length(self):
return self.end - self.start
@hybrid_method
def intersects(self, other):
return (self.start <= other.end and other.start <= self.end)
# Create a trigger in MySQL that enforces our range overlap constraints,
# since MySQL lacks a native range overlap type.
# Mysql trigger:
mysql_table_triggers = {
UsageEntry.__table__: """
CREATE TRIGGER %(table)s_%(funcname)s_range_constraint
BEFORE %(type)s ON `%(table)s`
FOR EACH ROW
BEGIN
DECLARE existing INT;
SET existing = ( SELECT COUNT(*) FROM `%(table)s` t
WHERE ( NEW.start < t.end
AND t.start < NEW.end )
AND service = NEW.service
AND tenant_id = NEW.tenant_id
AND resource_id = NEW.resource_id );
IF existing > 0 THEN
SET NEW.start = NULL;
SET NEW.end = NULL;
END IF;
END;""",
SalesOrder.__table__: """
CREATE TRIGGER %(table)s_%(funcname)s_range_constraint
BEFORE %(type)s ON `%(table)s`
FOR EACH ROW
BEGIN
DECLARE existing INT;
SET existing = ( SELECT COUNT(*) FROM `%(table)s` t
WHERE ( NEW.start < t.end
AND t.start < NEW.end )
AND tenant_id = NEW.tenant_id );
IF existing > 0 THEN
SET NEW.start = NULL;
SET NEW.end = NULL;
END IF;
END;
"""
}
# before insert
funcmaps = {"INSERT": "entry", "UPDATE": "change"}
for table in (SalesOrder.__table__, UsageEntry.__table__):
for type_ in ("INSERT", "UPDATE"):
event.listen(
table,
"after_create",
DDL(mysql_table_triggers[table] % {
"table": table,
"type": type_,
"funcname": funcmaps[type_]}).
execute_if(dialect="mysql"))
# And the postgres constraints
# Ideally this would use Postgres' exclusion constraints and a TSRange type.
# This is currently not feasible because I can't find a way to emit different
# DDL for MySQL and Postgres to support the varying concepts
# (single vs. dual columns).
pgsql_trigger_funcs = {
UsageEntry.__table__: """
CREATE FUNCTION %(table)s_exclusion_constraint_trigger() RETURNS trigger AS $trigger$
DECLARE
existing INTEGER = 0;
BEGIN
SELECT count(*) INTO existing FROM %(table)s t
WHERE t.service = NEW.service
AND t.tenant_id = NEW.tenant_id
AND t.resource_id = NEW.resource_id
AND ( NEW.start < t."end"
AND t.start < NEW."end" );
IF existing > 0 THEN
RAISE SQLSTATE '23P01';
RETURN NULL;
END IF;
RETURN NEW;
END;
$trigger$ LANGUAGE PLPGSQL;""",
SalesOrder.__table__: """
CREATE FUNCTION %(table)s_exclusion_constraint_trigger() RETURNS trigger AS $trigger$
DECLARE
existing INTEGER = 0;
BEGIN
SELECT count(*) INTO existing FROM %(table)s t
WHERE t.tenant_id = NEW.tenant_id
AND ( NEW.start < t."end"
AND t.start < NEW."end" );
IF existing > 0 THEN
RAISE SQLSTATE '23P01';
RETURN NULL;
END IF;
RETURN NEW;
END;
$trigger$ LANGUAGE PLPGSQL;"""
}
pgsql_trigger = """
CREATE TRIGGER %(table)s_exclusion_trigger BEFORE INSERT OR UPDATE ON %(table)s
FOR EACH ROW EXECUTE PROCEDURE %(table)s_exclusion_constraint_trigger();
"""
for table in (UsageEntry.__table__, SalesOrder.__table__):
event.listen(
table,
"after_create",
DDL(pgsql_trigger_funcs[table] % {
"table": table
}).execute_if(dialect="postgresql")
)
event.listen(
table,
"after_create",
DDL(pgsql_trigger % {
"table": table
}
).execute_if(dialect="postgresql")
)
# Create the PGSQL secondary trigger for sales order overlaps, for
# the usage entry
pgsql_secondary_trigger = """
CREATE TRIGGER %(table)s_secondary_exclusion_trigger BEFORE INSERT OR UPDATE ON %(table)s
FOR EACH ROW EXECUTE PROCEDURE %(secondary_table)s_exclusion_constraint_trigger();
"""
event.listen(
UsageEntry.__table__,
"after_create",
DDL(pgsql_secondary_trigger % {
"table": UsageEntry.__table__,
"secondary_table": SalesOrder.__table__
}).execute_if(dialect="postgresql")
)
event.listen(
UsageEntry.__table__,
"before_drop",
DDL("""DROP TRIGGER %(table)s_secondary_exclusion_trigger ON %(table)s""" %
{"table": UsageEntry.__table__,
"secondary_table": SalesOrder.__table__
}).execute_if(dialect="postgresql")
)
event.listen(
UsageEntry.__table__,
"before_drop",
DDL("DROP TRIGGER %(table)s_exclusion_trigger ON %(table)s" %
{"table": UsageEntry.__tablename__}).execute_if(dialect="postgresql")
)
event.listen(
UsageEntry.__table__,
"before_drop",
DDL("DROP FUNCTION %s_exclusion_constraint_trigger()" %
UsageEntry.__tablename__).execute_if(dialect="postgresql")
)
event.listen(
UsageEntry.__table__,
"before_drop",
DDL("DROP TRIGGER %(table)s_exclusion_trigger ON %(table)s" % {
"table": SalesOrder.__tablename__}).execute_if(dialect="postgresql")
)
event.listen(
UsageEntry.__table__,
"before_drop",
DDL("DROP FUNCTION %s_exclusion_constraint_trigger()" %
SalesOrder.__tablename__).execute_if(dialect="postgresql")
)
def insert_into_version(target, connection, **kw):
connection.execute("INSERT INTO %s (id) VALUES (%s)" %
(target.name, __VERSION__))
event.listen(
_Version.__table__,
"after_create",
insert_into_version
)

47
distil/rates.py Normal file
View File

@@ -0,0 +1,47 @@
from decimal import Decimal
import csv
import logging as log
class RatesManager(object):
def __init__(self, config):
self.config = config
def rate(self, name, region=None):
raise NotImplementedError("Not implemented in base class")
class RatesFile(RatesManager):
def rate(self, name, region=None):
try:
self.__rates
except AttributeError:
self.__rates = {}
if not self.__rates:
self.__rates = {}
try:
with open(self.config['file']) as fh:
# Makes no opinions on the file structure
reader = csv.reader(fh, delimiter="|")
for row in reader:
# The default layout is expected to be:
# location | rate name | rate measurement | rate value
self.__rates[row[1].strip()] = {
"rate": Decimal(row[3].strip()),
"region": row[0].strip(),
"unit": row[2].strip()
}
if not self.__rates:
raise IndexError("Malformed rates CSV!")
except KeyError:
# couldn't actually find the useful info for rates?
log.critical("Couldn't find rates info configuration option!")
raise
except IndexError:
raise IndexError("Malformed rates CSV!")
except IOError:
log.critical("Couldn't open the file!")
raise
return {'rate': self.__rates[name]["rate"],
'unit': self.__rates[name]["unit"]}

110
distil/transformers.py Normal file
View File

@@ -0,0 +1,110 @@
import datetime
import constants
import helpers
import config
class Transformer(object):
def transform_usage(self, name, data, start, end):
return self._transform_usage(name, data, start, end)
def _transform_usage(self, name, data, start, end):
raise NotImplementedError
class Uptime(Transformer):
"""
Transformer to calculate uptime based on states,
which is broken apart into flavor at point in time.
"""
def _transform_usage(self, name, data, start, end):
# get tracked states from config
tracked = config.transformers['uptime']['tracked_states']
tracked_states = {constants.states[i] for i in tracked}
usage_dict = {}
def sort_and_clip_end(usage):
cleaned = (self._clean_entry(s) for s in usage)
clipped = (s for s in cleaned if s['timestamp'] < end)
return sorted(clipped, key=lambda x: x['timestamp'])
state = sort_and_clip_end(data)
if not len(state):
# there was no data for this period.
return usage_dict
last_state = state[0]
if last_state['timestamp'] >= start:
last_timestamp = last_state['timestamp']
seen_sample_in_window = True
else:
last_timestamp = start
seen_sample_in_window = False
def _add_usage(diff):
flav = last_state['flavor']
usage_dict[flav] = usage_dict.get(flav, 0) + diff.total_seconds()
for val in state[1:]:
if last_state["counter_volume"] in tracked_states:
diff = val["timestamp"] - last_timestamp
if val['timestamp'] > last_timestamp:
# if diff < 0 then we were looking back before the start
# of the window.
_add_usage(diff)
last_timestamp = val['timestamp']
seen_sample_in_window = True
last_state = val
# extend the last state we know about, to the end of the window,
# if we saw any actual uptime.
if (end and last_state['counter_volume'] in tracked_states
and seen_sample_in_window):
diff = end - last_timestamp
_add_usage(diff)
# map the flavors to names on the way out
return {helpers.flavor_name(f): v for f, v in usage_dict.items()}
def _clean_entry(self, entry):
result = {
'counter_volume': entry['counter_volume'],
'flavor': entry['resource_metadata'].get(
'flavor.id', entry['resource_metadata'].get(
'instance_flavor_id', 0
)
)
}
try:
result['timestamp'] = datetime.datetime.strptime(
entry['timestamp'], constants.date_format)
except ValueError:
result['timestamp'] = datetime.datetime.strptime(
entry['timestamp'], constants.other_date_format)
return result
class GaugeMax(Transformer):
"""
Transformer for max-integration of a gauge value over time.
If the raw unit is 'gigabytes', then the transformed unit is
'gigabyte-hours'.
"""
def _transform_usage(self, name, data, start, end):
max_vol = max([v["counter_volume"] for v in data]) if len(data) else 0
hours = (end - start).total_seconds() / 3600.0
return {name: max_vol * hours}
# Transformer dict for us with the config.
# All usable transformers need to be here.
active_transformers = {
'Uptime': Uptime,
'GaugeMax': GaugeMax
}