708 lines
21 KiB
Python
708 lines
21 KiB
Python
# Interfaces to the Ceilometer API
|
|
# import ceilometer
|
|
|
|
# Brings in HTTP support
|
|
import requests
|
|
import json
|
|
import urllib
|
|
|
|
from copy import copy
|
|
from collections import defaultdict
|
|
|
|
#
|
|
import datetime
|
|
|
|
# Provides authentication against Openstack
|
|
from keystoneclient.v2_0 import client as KeystoneClient
|
|
|
|
# Provides hooks to ceilometer, which we need for data.
|
|
from ceilometerclient.v2.client import Client as ceilometer
|
|
|
|
from sqlalchemy import create_engine
|
|
|
|
# from .models.usage import Usage
|
|
from .models import Session, resources, tenants, usage
|
|
|
|
# from .models.tenants import Tenant
|
|
|
|
# Date format Ceilometer uses
|
|
# 2013-07-03T13:34:17
|
|
# which is, as an strftime:
|
|
# timestamp = datetime.strptime(res["timestamp"], "%Y-%m-%dT%H:%M:%S.%f")
|
|
# or
|
|
# timestamp = datetime.strptime(res["timestamp"], "%Y-%m-%dT%H:%M:%S")
|
|
|
|
# Most of the time we use date_format
|
|
date_format = "%Y-%m-%dT%H:%M:%S"
|
|
# Sometimes things also have milliseconds, so we look for that too.
|
|
# Because why not be annoying in all the ways?
|
|
other_date_format = "%Y-%m-%dT%H:%M:%S.%f"
|
|
|
|
|
|
def get_meter(meter, start, end, auth):
|
|
# Meter is a href; in this case, it has a set of fields with it already.
|
|
# print meter.link
|
|
# print dir(meter)
|
|
date_fields = [
|
|
{
|
|
"field": "timestamp",
|
|
"op": "ge",
|
|
"value": start.strftime(date_format)
|
|
},
|
|
{
|
|
"field": "timestamp",
|
|
"op": "lt",
|
|
"value": end.strftime(date_format)
|
|
}
|
|
]
|
|
fields = []
|
|
for field in date_fields:
|
|
fields.append(("q.field", field["field"]))
|
|
fields.append(("q.op", field["op"]))
|
|
fields.append(("q.value", field["value"]))
|
|
|
|
# Combine.
|
|
url = "&".join((meter.link, urllib.urlencode(fields)))
|
|
|
|
r = requests.get(
|
|
meter.link,
|
|
headers={
|
|
"X-Auth-Token": auth,
|
|
"Content-Type": "application/json"}
|
|
)
|
|
return json.loads(r.text)
|
|
|
|
|
|
class NotFound(BaseException):
|
|
pass
|
|
|
|
|
|
class keystone(KeystoneClient.Client):
|
|
|
|
def tenant_by_name(self, name):
|
|
authenticator = self.auth_url
|
|
url = "%(url)s/tenants?%(query)s" % {
|
|
"url": authenticator,
|
|
"query": urllib.urlencode({"name": name})
|
|
}
|
|
r = requests.get(url, headers={
|
|
"X-Auth-Token": self.auth_token,
|
|
"Content-Type": "application/json"
|
|
})
|
|
if r.ok:
|
|
data = json.loads(r.text)
|
|
assert data
|
|
return data
|
|
else:
|
|
if r.status_code == 404:
|
|
# couldn't find it
|
|
raise NotFound
|
|
|
|
|
|
class Artifice(object):
|
|
"""Produces billable artifacts"""
|
|
def __init__(self, config):
|
|
super(Artifice, self).__init__()
|
|
self.config = config
|
|
|
|
# This is the Keystone client connection, which provides our
|
|
# OpenStack authentication
|
|
self.auth = keystone(
|
|
username=config["openstack"]["username"],
|
|
password=config["openstack"]["password"],
|
|
tenant_name=config["openstack"]["default_tenant"],
|
|
auth_url=config["openstack"]["authentication_url"]
|
|
)
|
|
|
|
conn_dict = {
|
|
"username": config["database"]["username"],
|
|
"password": config["database"]["password"],
|
|
"host": config["database"]["host"],
|
|
"port": config["database"]["port"],
|
|
"database": config["database"]["database"]
|
|
}
|
|
conn_string = ('postgresql://%(username)s:%(password)s@' +
|
|
'%(host)s:%(port)s/%(database)s') % conn_dict
|
|
|
|
engine = create_engine(conn_string)
|
|
Session.configure(bind=engine)
|
|
self.session = Session()
|
|
self.artifice = None
|
|
|
|
self.ceilometer = ceilometer(
|
|
self.config["ceilometer"]["host"],
|
|
# Uses a lambda as ceilometer apparently wants
|
|
# to use it as a callable?
|
|
token=lambda: self.auth.auth_token
|
|
)
|
|
self._tenancy = None
|
|
|
|
def host_to_dc(self, host):
|
|
"""
|
|
:param host: The name to use.
|
|
:type host: str.
|
|
:returns: str -- The datacenter corresponding to this host.
|
|
"""
|
|
# :raises: AttributeError, KeyError
|
|
# How does this get implemented ? Should there be a module injection?
|
|
return "Data Center 1" # For the moment, passthrough
|
|
# TODO: FIXME.
|
|
|
|
def tenant(self, name):
|
|
"""
|
|
Returns a Tenant object describing the specified Tenant by
|
|
name, or raises a NotFound error.
|
|
"""
|
|
# Returns a Tenant object for the given name.
|
|
# Uses Keystone API to perform a direct name lookup,
|
|
# as this is expected to work via name.
|
|
|
|
data = self.auth.tenant_by_name(name)
|
|
t = Tenant(data["tenant"], self)
|
|
return t
|
|
|
|
@property
|
|
def tenants(self):
|
|
"""All the tenants in our system"""
|
|
# print "tenant list is %s" % self.auth.tenants.list()
|
|
if not self._tenancy:
|
|
self._tenancy = {}
|
|
for tenant in self.auth.tenants.list():
|
|
t = Tenant(tenant, self)
|
|
self._tenancy[t["name"]] = t
|
|
return self._tenancy
|
|
|
|
|
|
class Tenant(object):
|
|
|
|
def __init__(self, tenant, conn):
|
|
self.tenant = tenant
|
|
# Conn is the niceometer object we were instanced from
|
|
self.conn = conn
|
|
self._meters = set()
|
|
self._resources = None
|
|
self.invoice_type = None
|
|
|
|
# Invoice type needs to get set from the config, which is
|
|
# part of the Artifice setup above.
|
|
|
|
def __getitem__(self, item):
|
|
|
|
try:
|
|
return getattr(self.tenant, item)
|
|
except AttributeError:
|
|
try:
|
|
return self.tenant[item]
|
|
except KeyError:
|
|
raise KeyError("No such key '%s' in tenant" % item)
|
|
|
|
def __getattr__(self, attr):
|
|
if attr not in self.tenant:
|
|
return object.__getattribute__(self, attr)
|
|
# return super(Tenant, self).__getattr__(attr)
|
|
return self.tenant[attr]
|
|
|
|
def invoice(self, start, end):
|
|
|
|
"""
|
|
Creates a new Invoice.
|
|
Invoices are an Artifice datamodel that represent a
|
|
set of billable entries assigned to a client on a given Date.
|
|
An Invoice offers very little of its own opinions,
|
|
requiring a backend plugin to operate.
|
|
@returns: invoice
|
|
"""
|
|
|
|
if self.invoice_type is None:
|
|
invoice_type = self.conn.config["main"]["invoice:object"]
|
|
if ":" not in invoice_type:
|
|
raise AttributeError("Invoice configuration incorrect! %s" %
|
|
invoice_type)
|
|
module, call = invoice_type.split(":")
|
|
_package = __import__(module, globals(), locals(), [call])
|
|
|
|
funct = getattr(_package, call)
|
|
self.invoice_type = funct
|
|
config = self.conn.config["invoice_object"]
|
|
invoice = self.invoice_type(self, start, end, config)
|
|
return invoice
|
|
|
|
def resources(self, start, end):
|
|
if not self._resources:
|
|
date_fields = [
|
|
{"field": "project_id",
|
|
"op": "eq",
|
|
"value": self.tenant["id"]
|
|
},
|
|
]
|
|
# Sets up our resources as Ceilometer objects.
|
|
# That's cool, I think.
|
|
self._resources = self.conn.ceilometer.resources.list(date_fields)
|
|
return self._resources
|
|
|
|
# def usage(self, start, end, section=None):
|
|
def usage(self, start, end):
|
|
"""
|
|
Usage is the meat of Artifice, returning a dict of location to
|
|
sub-information
|
|
"""
|
|
# Returns a usage dict, based on regions.
|
|
vms = {}
|
|
vm_to_region = {}
|
|
ports = {}
|
|
|
|
usage_by_dc = {}
|
|
|
|
writing_to = None
|
|
|
|
vms = []
|
|
networks = []
|
|
ips = []
|
|
storage = []
|
|
volumes = []
|
|
|
|
# Object storage is mapped by project_id
|
|
|
|
for resource in self.resources(start, end):
|
|
rels = [link["rel"] for link in resource.links if link["rel"] != 'self']
|
|
if "storage.objects" in rels:
|
|
# Unknown how this data layout happens yet.
|
|
storage.append(Resource(resource, self.conn))
|
|
pass
|
|
elif "network.incoming.bytes" in rels:
|
|
# Have we seen the VM that owns this yet?
|
|
networks.append(Resource(resource, self.conn))
|
|
elif "volume" in rels:
|
|
volumes.append(Resource(resource, self.conn))
|
|
elif 'instance' in rels:
|
|
vms.append(Resource(resource, self.conn))
|
|
elif 'ip.floating' in rels:
|
|
ips.append(Resource(resource, self.conn))
|
|
|
|
datacenters = {}
|
|
region_tmpl = {
|
|
"vms": vms,
|
|
"networks": networks,
|
|
"objects": storage,
|
|
"volumes": volumes,
|
|
"ips": ips
|
|
}
|
|
|
|
return Usage(region_tmpl, start, end, self.conn)
|
|
|
|
|
|
class Usage(object):
|
|
"""
|
|
This is a dict-like object containing all the datacenters and
|
|
meters available in those datacenters.
|
|
"""
|
|
|
|
def __init__(self, contents, start, end, conn):
|
|
self.contents = contents
|
|
self.start = start
|
|
self.end = end
|
|
self.conn = conn
|
|
|
|
self._vms = []
|
|
self._objects = []
|
|
self._volumes = []
|
|
self._networks = []
|
|
self._ips = []
|
|
|
|
# Replaces all the internal references with better references to
|
|
# actual metered values.
|
|
# self._replace()
|
|
|
|
@property
|
|
def vms(self):
|
|
if not self._vms:
|
|
vms = []
|
|
for vm in self.contents["vms"]:
|
|
VM = resources.VM(vm, self.start, self.end)
|
|
md = vm["metadata"]
|
|
host = md["host"]
|
|
VM.location = self.conn.host_to_dc(vm["metadata"]["host"])
|
|
vms.append(VM)
|
|
self._vms = vms
|
|
return self._vms
|
|
|
|
@property
|
|
def objects(self):
|
|
if not self._objects:
|
|
objs = []
|
|
for object_ in self.contents["objects"]:
|
|
obj = resources.Object(object_, self.start, self.end)
|
|
objs.append(obj)
|
|
self._objs = objs
|
|
return self._objs
|
|
|
|
@property
|
|
def networks(self):
|
|
if not self._networks:
|
|
networks = []
|
|
for obj in self.contents["networks"]:
|
|
obj = resources.Network(obj, self.start, self.end)
|
|
networks.append(obj)
|
|
self._networks = networks
|
|
return self._networks
|
|
|
|
@property
|
|
def ips(self):
|
|
if not self._ips:
|
|
ips = []
|
|
for obj in self.contents["ips"]:
|
|
obj = resources.FloatingIP(obj, self.start, self.end)
|
|
ips.append(obj)
|
|
self._ips = ips
|
|
return self._ips
|
|
|
|
@property
|
|
def volumes(self):
|
|
if not self._volumes:
|
|
objs = []
|
|
for obj in self.contents["volumes"]:
|
|
obj = resources.Volume(obj, self.start, self.end)
|
|
objs.append(obj)
|
|
self._volumes = objs
|
|
return self._volumes
|
|
# def __getitem__(self, item):
|
|
|
|
# return self.contents[item]
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def next(self):
|
|
# pass
|
|
keys = self.contents.keys()
|
|
for key in keys:
|
|
yield key
|
|
raise StopIteration()
|
|
|
|
def save(self):
|
|
|
|
"""
|
|
Iterate the list of things; save them to DB.
|
|
"""
|
|
|
|
for vm in self.vms:
|
|
vm.save()
|
|
|
|
for obj in self.objects:
|
|
obj.save()
|
|
|
|
for vol in self.volumes:
|
|
vol.save()
|
|
|
|
|
|
class Resource(object):
|
|
|
|
def __init__(self, resource, conn):
|
|
self.resource = resource
|
|
self.conn = conn
|
|
self._meters = {}
|
|
|
|
# def __getitem__(self, item):
|
|
# return self.resource
|
|
|
|
def meter(self, name, start, end):
|
|
pass # Return a named meter
|
|
for meter in self.resource.links:
|
|
if meter["rel"] == name:
|
|
m = Meter(self, meter["href"], self.conn, start, end)
|
|
self._meters[name] = m
|
|
return m
|
|
raise AttributeError("no such meter %s" % name)
|
|
|
|
def __getitem__(self, name):
|
|
# print name
|
|
# print self.resource
|
|
# print self.resource[name]
|
|
return getattr(self.resource, name)
|
|
# return self.resource.name
|
|
|
|
@property
|
|
def meters(self):
|
|
if not self._meters:
|
|
meters = []
|
|
for link in self.resource["links"]:
|
|
if link["rel"] == "self":
|
|
continue
|
|
meter = Meter(self, link, self.conn)
|
|
meters.append(meter)
|
|
self._meters = meters
|
|
return self._meters
|
|
|
|
|
|
class Meter(object):
|
|
|
|
def __init__(self, resource, link, conn, start=None, end=None):
|
|
self.resource = resource
|
|
self.link = link
|
|
self.conn = conn
|
|
self.start = start
|
|
self.end = end
|
|
# self.meter = meter
|
|
|
|
def __getitem__(self, x):
|
|
if isinstance(x, slice):
|
|
# Woo
|
|
pass
|
|
pass
|
|
|
|
def volume(self):
|
|
|
|
return self.usage(self.start, self.end)
|
|
|
|
def usage(self, start, end):
|
|
"""
|
|
Usage condenses the entirety of a meter into a single datapoint:
|
|
A volume value that we can plot as a single number against previous
|
|
usage for a given range.
|
|
"""
|
|
measurements = get_meter(self, start, end, self.conn.auth.auth_token)
|
|
# return measurements
|
|
|
|
# print measurements
|
|
|
|
self.measurements = defaultdict(list)
|
|
self.type = set([a["counter_type"] for a in measurements])
|
|
if len(self.type) > 1:
|
|
# That's a big problem
|
|
raise RuntimeError("Too many types for measurement!")
|
|
elif len(self.type) == 0:
|
|
raise RuntimeError("No types!")
|
|
else:
|
|
self.type = self.type.pop()
|
|
type_ = None
|
|
if self.type == "cumulative":
|
|
# The usage is the last one, which is the highest value.
|
|
#
|
|
# Base it just on the resource ID.
|
|
# Is this a reasonable thing to do?
|
|
# Composition style: resource.meter("cpu_util").usage(start, end) == artifact
|
|
type_ = Cumulative
|
|
elif self.type == "gauge":
|
|
type_ = Gauge
|
|
# return Gauge(self.Resource, )
|
|
elif self.type == "delta":
|
|
type_ = Delta
|
|
|
|
return type_(self.resource, measurements, start, end)
|
|
|
|
def save(self):
|
|
if not self.start and self.end:
|
|
raise AttributeError("Needs start and end defined to save")
|
|
self.volume().save()
|
|
|
|
|
|
class Artifact(object):
|
|
|
|
"""
|
|
Provides base artifact controls; generic typing information
|
|
for the artifact structures.
|
|
"""
|
|
|
|
def __init__(self, resource, usage, start, end):
|
|
|
|
self.resource = resource
|
|
self.usage = usage
|
|
self.start = start
|
|
self.end = end
|
|
|
|
def __getitem__(self, item):
|
|
if item in self._data:
|
|
return self._data[item]
|
|
raise KeyError("no such item %s" % item)
|
|
|
|
def save(self):
|
|
"""
|
|
Persists to our database backend.
|
|
Opinionatedly this is a sql datastore.
|
|
"""
|
|
value = self.volume()
|
|
session = self.resource.conn.session
|
|
# self.artifice.
|
|
try:
|
|
tenant_id = self.resource["tenant_id"]
|
|
except KeyError:
|
|
tenant_id = self.resource["project_id"]
|
|
resource_id = self.resource["resource_id"]
|
|
|
|
tenant = session.query(tenants.Tenant).get(tenant_id)
|
|
|
|
if tenant is None:
|
|
res = resources.Resource()
|
|
tenant = tenants.Tenant()
|
|
tenant.id = tenant_id
|
|
|
|
res.id = resource_id
|
|
res.tenant = tenant
|
|
session.add(res)
|
|
session.add(tenant)
|
|
else:
|
|
try:
|
|
matching = resources.Resource.id == resource_id
|
|
res = session.query(resources.Resource).filter(matching)[0]
|
|
tenant = res.tenant
|
|
except IndexError:
|
|
res = resources.Resource()
|
|
tenant = tenants.Tenant()
|
|
tenant.id = tenant_id
|
|
res.id = resource_id
|
|
res.tenant = tenant
|
|
session.add(res)
|
|
session.add(tenant)
|
|
|
|
this_usage = usage.Usage(
|
|
res,
|
|
tenant,
|
|
value,
|
|
self.start,
|
|
self.end,
|
|
)
|
|
session.add(this_usage)
|
|
session.commit() # Persist to Postgres
|
|
|
|
def volume(self):
|
|
"""
|
|
Default billable number for this volume
|
|
"""
|
|
return sum([x["counter_volume"] for x in self.usage])
|
|
|
|
|
|
class Cumulative(Artifact):
|
|
|
|
def volume(self):
|
|
measurements = self.usage
|
|
measurements = sorted(measurements, key=lambda x: x["timestamp"])
|
|
count = 0
|
|
usage = 0
|
|
last_measure = None
|
|
for measure in measurements:
|
|
if last_measure is not None and (measure["counter_volume"] <
|
|
last_measure["counter_volume"]):
|
|
usage = usage + last_measure["counter_volume"]
|
|
count = count + 1
|
|
last_measure = measure
|
|
|
|
usage = usage + measurements[-1]["counter_volume"]
|
|
|
|
if count > 1:
|
|
total_usage = usage - measurements[0]["counter_volume"]
|
|
return total_usage
|
|
|
|
|
|
# Gauge and Delta have very little to do: They are expected only to
|
|
# exist as "not a cumulative" sort of artifact.
|
|
class Gauge(Artifact):
|
|
|
|
def volume(self):
|
|
"""
|
|
Default billable number for this volume
|
|
"""
|
|
# print "Usage is %s" % self.usage
|
|
usage = sorted(self.usage, key=lambda x: x["timestamp"])
|
|
|
|
blocks = []
|
|
curr = [usage[0]]
|
|
last = usage[0]
|
|
try:
|
|
last["timestamp"] = datetime.datetime.strptime(last["timestamp"],
|
|
date_format)
|
|
except ValueError:
|
|
last["timestamp"] = datetime.datetime.strptime(last["timestamp"],
|
|
other_date_format)
|
|
except TypeError:
|
|
pass
|
|
|
|
for val in usage[1:]:
|
|
try:
|
|
val["timestamp"] = datetime.datetime.strptime(val["timestamp"],
|
|
date_format)
|
|
except ValueError:
|
|
val["timestamp"] = datetime.datetime.strptime(val["timestamp"],
|
|
other_date_format)
|
|
except TypeError:
|
|
pass
|
|
|
|
difference = (val['timestamp'] - last["timestamp"])
|
|
if difference > datetime.timedelta(hours=1):
|
|
blocks.append(curr)
|
|
curr = [val]
|
|
last = val
|
|
else:
|
|
curr.append(val)
|
|
|
|
# this adds the last remaining values as a block of their own on exit
|
|
# might mean people are billed twice for an hour at times...
|
|
# but solves the issue of not billing if there isn't enough data for
|
|
# full hour.
|
|
blocks.append(curr)
|
|
|
|
# We are now sorted into 1-hour blocks
|
|
totals = []
|
|
for block in blocks:
|
|
usage = max([v["counter_volume"] for v in block])
|
|
totals.append(usage)
|
|
|
|
# totals = [max(x, key=lambda val: val["counter_volume"] ) for x in blocks]
|
|
# totals is now an array of max values per hour for a given month.
|
|
return sum(totals)
|
|
|
|
def uptime(self, tracked):
|
|
"""Calculates uptime accurately for the given 'tracked' states.
|
|
- Will ignore all other states.
|
|
- Relies heavily on the existence of a state meter, and
|
|
should only ever be called on the state meter.
|
|
|
|
Returns: uptime in seconds"""
|
|
|
|
usage = sorted(self.usage, key=lambda x: x["timestamp"])
|
|
|
|
last = usage[0]
|
|
try:
|
|
last["timestamp"] = datetime.datetime.strptime(last["timestamp"],
|
|
date_format)
|
|
except ValueError:
|
|
last["timestamp"] = datetime.datetime.strptime(last["timestamp"],
|
|
other_date_format)
|
|
except TypeError:
|
|
pass
|
|
|
|
uptime = 0.0
|
|
|
|
for val in usage[1:]:
|
|
try:
|
|
val["timestamp"] = datetime.datetime.strptime(val["timestamp"],
|
|
date_format)
|
|
except ValueError:
|
|
val["timestamp"] = datetime.datetime.strptime(val["timestamp"],
|
|
other_date_format)
|
|
except TypeError:
|
|
pass
|
|
|
|
if val["counter_volume"] in tracked:
|
|
difference = val["timestamp"] - last["timestamp"]
|
|
|
|
# TODO: rethink this:
|
|
# might need to account for sudden jumps
|
|
# caused due to ceilometer down time:
|
|
if difference > datetime.timedelta(hours=1):
|
|
# the timedelta should be the ceilometer interval.
|
|
# do nothing if different greater than twice interval?
|
|
# or just add interval length to uptime.
|
|
# FLAGS! logs these events so sys ops can doulbe check them
|
|
pass
|
|
else:
|
|
# otherwise just add difference.
|
|
uptime = uptime + difference.seconds
|
|
|
|
last = val
|
|
|
|
return uptime
|
|
|
|
|
|
class Delta(Artifact):
|
|
pass
|