grace.yu 4c69d120cd Add initial code for user authentication
Change-Id: Ie0c439244f1ae3af707b73ef64b1a411c2aede20
2014-04-14 18:13:04 -07:00

1605 lines
56 KiB
Python

# Copyright 2014 Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Define all the RestfulAPI entry points."""
import logging
import netaddr
import re
import simplejson as json
import sys
from flask import flash
from flask import redirect
from flask import render_template
from flask import request
from flask import session as app_session
from flask import url_for
from flask.ext.restful import Resource
from sqlalchemy.sql import and_
from sqlalchemy.sql import or_
from compass.api import app
from compass.api import auth
from compass.api import errors
from compass.api import login_manager
from compass.api import util
from compass.db import database
from compass.db.model import Adapter
from compass.db.model import Cluster as ModelCluster
from compass.db.model import ClusterHost as ModelClusterHost
from compass.db.model import ClusterState
from compass.db.model import HostState
from compass.db.model import Machine as ModelMachine
from compass.db.model import Role
from compass.db.model import Switch as ModelSwitch
from compass.db.model import SwitchConfig
from compass.db.model import User as ModelUser
from compass.tasks.client import celery
from flask.ext.login import current_user
from flask.ext.login import login_required
from flask.ext.login import login_user
from flask.ext.login import logout_user
from flask.ext.wtf import Form
from wtforms.fields import BooleanField
from wtforms.fields import PasswordField
from wtforms.fields import TextField
from wtforms.validators import Required
@login_manager.header_loader
def load_user_from_token(token):
"""Return a user object from token."""
duration = app.config['REMEMBER_COOKIE_DURATION']
max_age = 0
if sys.version_info > (2, 6):
max_age = duration.total_seconds()
else:
max_age = (duration.microseconds + (
duration.seconds + duration.days * 24 * 3600) * 1e6) / 1e6
user_id = auth.get_user_info_from_token(token, max_age)
if not user_id:
logging.info("No user can be found from the token!")
return None
user = User.query.filter_by(id=user_id)
return user
@login_manager.user_loader
def load_user(user_id):
"""Load user from user ID."""
return ModelUser.query.get(user_id)
@app.route('/restricted')
def restricted():
return render_template('restricted.jinja')
@app.errorhandler(403)
def forbidden_403(exception):
"""Unathenticated user page."""
return render_template('forbidden.jinja'), 403
@app.route('/logout')
@login_required
def logout():
"""User logout."""
logout_user()
flash('You have logged out!')
return redirect(url_for('index'))
@app.route('/')
def index():
"""Index page."""
return render_template('index.jinja')
@app.route('/token', methods=['POST'])
def get_token():
"""Get token from email and passowrd after user authentication."""
data = json.loads(request.data)
email = data['email']
password = data['password']
user = auth.authenticate_user(email, password)
if not user:
error_msg = "User cannot be found or email and password do not match!"
return errors.handle_invalid_user_info(
errors.ObjectDoesNotExist(error_msg)
)
token = user.get_auth_token()
login_user(user)
return util.make_json_response(
200, {"status": "OK", "token": token}
)
class LoginForm(Form):
"""Define login form."""
email = TextField('Email', validators=[Required()])
password = PasswordField('Password', validators=[Required()])
remember = BooleanField('Remember me', default=False)
@app.route("/login", methods=['GET', 'POST'])
def login():
"""User login."""
if current_user.is_authenticated():
return redirect(url_for('index'))
else:
form = LoginForm()
if form.validate_on_submit():
email = form.email.data
password = form.password.data
user = auth.authenticate_user(email, password)
if not user:
flash('Wrong username or password!', 'error')
return render_template('login.jinja', form=form)
if login_user(user, remember=form.remember.data):
# Enable session expiration if user didnot choose to be
# remembered.
app_session.permanent = not form.remember.data
flash('Logged in successfully!', 'success')
return redirect(request.args.get('next') or url_for('index'))
else:
flash('This username is disabled!', 'error')
return render_template('login.jinja', form=form)
class SwitchList(Resource):
"""Query details of switches and poll swithes."""
ENDPOINT = "/switches"
SWITCHIP = 'switchIp'
SWITCHIPNETWORK = 'switchIpNetwork'
LIMIT = 'limit'
def get(self):
"""List details of all switches filtered by some conditions.
.. note::
switchIp and swtichIpNetwork cannot be combined to use.
:param switchIp: switch IP address
:param switchIpNetwork: switch IP network
:param limit: the number of records excepted to return
"""
qkeys = request.args.keys()
logging.info('SwitchList query strings : %s', qkeys)
switch_list = []
with database.session() as session:
switches = []
switch_ips = request.args.getlist(self.SWITCHIP)
switch_ip_network = request.args.get(self.SWITCHIPNETWORK,
type=str)
limit = request.args.get(self.LIMIT, 0, type=int)
if switch_ips and switch_ip_network:
error_msg = ("switchIp and switchIpNetwork cannot be "
"specified at the same time!")
return errors.handle_invalid_usage(
errors.UserInvalidUsage(error_msg))
if limit < 0:
error_msg = "limit cannot be less than 1!"
return errors.handle_invalid_usage(
errors.UserInvalidUsage(error_msg))
if switch_ips:
for ip_addr in switch_ips:
ip_addr = str(ip_addr)
if not util.is_valid_ip(ip_addr):
error_msg = 'SwitchIp format is incorrect!'
return errors.handle_invalid_usage(
errors.UserInvalidUsage(error_msg))
switch = session.query(ModelSwitch).filter_by(ip=ip_addr)\
.first()
if switch:
switches.append(switch)
logging.info('[SwitchList][get] ip %s', ip_addr)
if limit:
switches = switches[:limit]
elif switch_ip_network:
# query all switches which belong to the same network
if not util.is_valid_ipnetowrk(switch_ip_network):
error_msg = 'SwitchIpNetwork format is incorrect!'
return errors.handle_invalid_usage(
errors.UserInvalidUsage(error_msg))
def get_queried_ip_prefix(network, prefix):
"""Get Ip prefex as pattern used to query switches.
.. note::
Switches' Ip addresses need to match this pattern.
"""
count = int(prefix / 8)
if count == 0:
count = 1
return network.rsplit('.', count)[0] + '.'
ip_network = netaddr.IPNetwork(switch_ip_network)
ip_filter = get_queried_ip_prefix(str(ip_network.network),
ip_network.prefixlen)
logging.info('ip_filter is %s', ip_filter)
result_set = []
if limit:
result_set = session.query(ModelSwitch).filter(
ModelSwitch.ip.startswith(ip_filter)
).limit(limit).all()
else:
result_set = session.query(ModelSwitch).filter(
ModelSwitch.ip.startswith(ip_filter)
).all()
for switch in result_set:
ip_addr = str(switch.ip)
if netaddr.IPAddress(ip_addr) in ip_network:
switches.append(switch)
logging.info('[SwitchList][get] ip %s', ip_addr)
if not switch_ips and not switch_ip_network:
if limit:
switches = session.query(ModelSwitch).limit(limit).all()
else:
switches = session.query(ModelSwitch).all()
for switch in switches:
switch_res = {}
switch_res['id'] = switch.id
switch_res['ip'] = switch.ip
switch_res['state'] = switch.state
if switch.state != 'under_monitoring':
switch_res['err_msg'] = switch.err_msg
switch_res['link'] = {
'rel': 'self',
'href': '/'.join((self.ENDPOINT, str(switch.id)))}
switch_list.append(switch_res)
logging.info('get switch list: %s', switch_list)
return util.make_json_response(
200, {"status": 'OK',
"switches": switch_list})
def post(self):
"""Insert switch IP and the credential to db.
.. note::
Invoke a task to poll switch at the same time.
:param ip: switch IP address
:param credential: a dict for accessing the switch
"""
ip_addr = None
credential = None
logging.debug('post switch request from curl is %s', request.data)
json_data = json.loads(request.data)
ip_addr = json_data['switch']['ip']
credential = json_data['switch']['credential']
logging.info('post switch ip_addr=%s credential=%s(%s)',
ip_addr, credential, type(credential))
if not util.is_valid_ip(ip_addr):
error_msg = "Invalid IP address format!"
return errors.handle_invalid_usage(
errors.UserInvalidUsage(error_msg)
)
new_switch = {}
with database.session() as session:
switch = session.query(ModelSwitch).filter_by(ip=ip_addr).first()
logging.info('switch for ip %s: %s', ip_addr, switch)
if switch:
error_msg = "IP address '%s' already exists" % ip_addr
value = {'failedSwitch': switch.id}
return errors.handle_duplicate_object(
errors.ObjectDuplicateError(error_msg), value
)
switch = ModelSwitch(ip=ip_addr)
switch.credential = credential
session.add(switch)
session.flush()
new_switch['id'] = switch.id
new_switch['ip'] = switch.ip
new_switch['state'] = switch.state
link = {'rel': 'self',
'href': '/'.join((self.ENDPOINT, str(switch.id)))}
new_switch['link'] = link
celery.send_task("compass.tasks.pollswitch", (ip_addr,))
logging.info('new switch added: %s', new_switch)
return util.make_json_response(
202,
{
"status": "accepted",
"switch": new_switch
}
)
class Switch(Resource):
"""Get and update a single switch information."""
ENDPOINT = "/switches"
def get(self, switch_id):
"""Lists details of the specified switch.
:param switch_id: switch ID in db
"""
switch_res = {}
with database.session() as session:
switch = session.query(ModelSwitch).filter_by(id=switch_id).first()
logging.info('switch for id %s: %s', switch_id, switch)
if not switch:
error_msg = "Cannot find the switch with id=%s" % switch_id
logging.debug("[/switches/{id}]error_msg: %s", error_msg)
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg)
)
switch_res['id'] = switch.id
switch_res['ip'] = switch.ip
switch_res['state'] = switch.state
if switch.state != 'under_monitoring':
switch_res['err_msg'] = switch.err_msg
switch_res['link'] = {
'rel': 'self',
'href': '/'.join((self.ENDPOINT, str(switch.id)))}
logging.info('switch info for %s: %s', switch_id, switch_res)
return util.make_json_response(
200, {"status": "OK",
"switch": switch_res})
def put(self, switch_id):
"""Update an existing switch information.
:param switch_id: the unqiue identifier of the switch
"""
switch = None
credential = None
logging.debug('PUT a switch request from curl is %s', request.data)
ip_addr = None
switch_res = {}
with database.session() as session:
switch = session.query(ModelSwitch).filter_by(id=switch_id).first()
logging.info('PUT switch id is %s: %s', switch_id, switch)
if not switch:
# No switch is found.
error_msg = 'Cannot update a non-existing switch!'
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg))
json_data = json.loads(request.data)
credential = json_data['switch']['credential']
logging.info('PUT switch id=%s credential=%s(%s)',
switch_id, credential, type(credential))
switch.credential = credential
switch.state = "repolling"
switch.err_msg = ""
ip_addr = switch.ip
switch_res['id'] = switch.id
switch_res['ip'] = switch.ip
switch_res['state'] = switch.state
link = {'rel': 'self',
'href': '/'.join((self.ENDPOINT, str(switch.id)))}
switch_res['link'] = link
celery.send_task("compass.tasks.pollswitch", (ip_addr,))
return util.make_json_response(
202, {"status": "accepted",
"switch": switch_res})
def delete(self, switch_id):
"""No implementation.
:param switch_id: the unique identifier of the switch.
"""
err_msg = "The delete API for switch has not been implemented!"
return errors.handle_not_allowed_method(
errors.MethodNotAllowed(err_msg))
class MachineList(Resource):
"""Query machines by filters."""
ENDPOINT = "/machines"
SWITCHID = 'switchId'
MAC = 'mac'
VLANID = 'vladId'
PORT = 'port'
LIMIT = 'limit'
def get(self):
"""Lists details of machines.
.. note::
The machines are filtered by some conditions as
the following. According to SwitchConfig, machines
with some ports will be filtered.
:param switchId: the unique identifier of the switch
:param mac: the MAC address
:param vladId: the vlan ID
:param port: the port number
:param limit: the number of records expected to return
"""
switch_id = request.args.get(self.SWITCHID, type=int)
mac = request.args.get(self.MAC, None, type=str)
vlan = request.args.get(self.VLANID, type=int)
port = request.args.get(self.PORT, None)
limit = request.args.get(self.LIMIT, 0, type=int)
with database.session() as session:
machines = []
filter_clause = []
if switch_id:
filter_clause.append('switch_id=%d' % switch_id)
if mac:
filter_clause.append('mac=%s' % mac)
if vlan:
filter_clause.append('vlan=%d' % vlan)
if port:
filter_clause.append('port=%s' % port)
if limit < 0:
error_msg = 'Limit cannot be less than 0!'
return errors.UserInvalidUsage(
errors.UserInvalidUsage(error_msg)
)
# TODO(grace): support query filtered port
if filter_clause:
machines = session.query(ModelMachine)\
.filter(and_(*filter_clause)).all()
else:
machines = session.query(ModelMachine).all()
filter_list = session.query(ModelSwitch.id,
SwitchConfig.filter_port)\
.filter(ModelSwitch.ip == SwitchConfig.ip)\
.all()
ports_by_id = {}
for entry in filter_list:
s_id = entry[0]
f_port = entry[1]
ports_by_id.setdefault(s_id, []).append(f_port)
machines_result = []
for machine in machines:
if limit and len(machines_result) == limit:
break
if machine.switch_id in ports_by_id:
if machine.port in ports_by_id[machine.switch_id]:
continue
machine_res = {}
machine_res['switch_ip'] = (
None if not machine.switch else machine.switch.ip)
machine_res['id'] = machine.id
machine_res['mac'] = machine.mac
machine_res['port'] = machine.port
machine_res['vlan'] = machine.vlan
machine_res['link'] = {
'rel': 'self',
'href': '/'.join((self.ENDPOINT, str(machine.id)))}
machines_result.append(machine_res)
logging.info('machines for %s: %s', switch_id, machines_result)
return util.make_json_response(
200, {"status": "OK",
"machines": machines_result})
class Machine(Resource):
"""List details of the machine with specific machine id."""
ENDPOINT = '/machines'
def get(self, machine_id):
"""Lists details of the specified machine.
:param machine_id: the unique identifier of the machine
"""
machine_res = {}
with database.session() as session:
machine = session.query(ModelMachine)\
.filter_by(id=machine_id)\
.first()
logging.info('machine for id %s: %s', machine_id, machine)
if not machine:
error_msg = "Cannot find the machine with id=%s" % machine_id
logging.debug("[/api/machines/{id}]error_msg: %s", error_msg)
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg))
machine_res['id'] = machine.id
machine_res['mac'] = machine.mac
machine_res['port'] = machine.port
machine_res['vlan'] = machine.vlan
if machine.switch:
machine_res['switch_ip'] = machine.switch.ip
else:
machine_res['switch_ip'] = None
machine_res['link'] = {
'rel': 'self',
'href': '/'.join((self.ENDPOINT, str(machine.id)))}
logging.info('machine info for %s: %s', machine_id, machine_res)
return util.make_json_response(
200, {"status": "OK",
"machine": machine_res})
class Cluster(Resource):
"""Creates cluster and lists cluster details.
.. note::
Update and list the cluster's configuration information.
"""
ENDPOINT = '/clusters'
SECURITY = 'security'
NETWORKING = 'networking'
PARTITION = 'partition'
def get(self, cluster_id, resource=None):
"""Lists details of the resource specified cluster.
:param cluster_id: the unique identifier of the cluster
:param resource: the resource name(security, networking, partition)
"""
cluster_resp = {}
resp = {}
with database.session() as session:
cluster = session.query(
ModelCluster).filter_by(id=cluster_id).first()
logging.debug('cluster is %s', cluster)
if not cluster:
error_msg = 'Cannot found the cluster with id=%s' % cluster_id
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg)
)
if resource:
# List resource details
if resource == self.SECURITY:
cluster_resp = cluster.security
elif resource == self.NETWORKING:
cluster_resp = cluster.networking
elif resource == self.PARTITION:
cluster_resp = cluster.partition
else:
error_msg = "Invalid resource name '%s'!" % resource
return errors.handle_invalid_usage(
errors.UserInvalidUsage(error_msg)
)
resp = {"status": "OK",
resource: cluster_resp}
else:
cluster_resp['clusterName'] = cluster.name
cluster_resp['link'] = {
'rel': 'self',
'href': '/'.join((self.ENDPOINT, str(cluster.id)))
}
cluster_resp['id'] = cluster.id
resp = {"status": "OK",
"cluster": cluster_resp}
logging.info('get cluster result is %s', cluster_resp)
return util.make_json_response(200, resp)
def post(self):
"""Create a new cluster.
:param name: the name of the cluster
:param adapter_id: the unique identifier of the adapter
"""
request_data = None
request_data = json.loads(request.data)
cluster_name = request_data['cluster']['name']
adapter_id = request_data['cluster']['adapter_id']
cluster_resp = {}
cluster = None
with database.session() as session:
cluster = session.query(ModelCluster).filter_by(name=cluster_name)\
.first()
if cluster:
error_msg = "Cluster name '%s' already exists!" % cluster.name
return errors.handle_duplicate_object(
errors.ObjectDuplicateError(error_msg))
adapter = session.query(Adapter).filter_by(id=adapter_id).first()
if not adapter:
error_msg = "No adapter id=%s can be found!" % adapter_id
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg))
# Create a new cluster in database
cluster = ModelCluster(name=cluster_name, adapter_id=adapter_id)
session.add(cluster)
session.flush()
cluster_resp['id'] = cluster.id
cluster_resp['name'] = cluster.name
cluster_resp['adapter_id'] = cluster.adapter_id
cluster_resp['link'] = {
'rel': 'self',
'href': '/'.join((self.ENDPOINT, str(cluster.id)))
}
return util.make_json_response(
200,
{
"status": "OK",
"cluster": cluster_resp
}
)
def put(self, cluster_id, resource):
"""Update the resource information of the specified cluster.
:param cluster_id: the unique identifier of the cluster
:param resource: resource name(security, networking, partition)
"""
resources = {
self.SECURITY: {'validator': 'is_valid_security_config',
'column': 'security_config'},
self.NETWORKING: {'validator': 'is_valid_networking_config',
'column': 'networking_config'},
self.PARTITION: {'validator': 'is_valid_partition_config',
'column': 'partition_config'},
}
request_data = json.loads(request.data)
with database.session() as session:
cluster = session.query(
ModelCluster).filter_by(id=cluster_id).first()
if not cluster:
error_msg = 'You are trying to update a non-existing cluster!'
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg)
)
if resource not in request_data:
error_msg = "Invalid resource name '%s'" % resource
return errors.handle_invalid_usage(
errors.UserInvalidUsage(error_msg)
)
value = request_data[resource]
if resource not in resources.keys():
error_msg = "Invalid resource name '%s'" % resource
return errors.handle_invalid_usage(
errors.UserInvalidUsage(error_msg)
)
validate_func = resources[resource]['validator']
module = globals()['util']
is_valid, msg = getattr(module, validate_func)(value)
if is_valid:
column = resources[resource]['column']
session.query(ModelCluster).filter_by(id=cluster_id).update(
{column: json.dumps(value)}
)
else:
return errors.handle_mssing_input(
errors.InputMissingError(msg)
)
return util.make_json_response(
200, {"status": "OK"}
)
@app.route("/clusters", methods=['GET'])
def list_clusters():
"""Lists the details of all clusters."""
endpoint = '/clusters'
state = request.args.get('state', None, type=str)
results = []
with database.session() as session:
clusters = []
if not state:
# Get all clusters
clusters = session.query(ModelCluster).all()
elif state == 'undeployed':
clusters_state = session.query(ClusterState.id).all()
cluster_ids = [t[0] for t in clusters_state]
# The cluster has not been deployed yet
clusters = session.query(ModelCluster)\
.filter(~ModelCluster.id.in_(cluster_ids))\
.all()
elif state == 'installing':
# The deployment of this cluster is in progress.
clusters = session.query(
ModelCluster
).filter(
ModelCluster.id == ClusterState.id,
or_(
ClusterState.state == 'INSTALLING',
ClusterState.state == 'UNINITIALIZED'
)
).all()
elif state == 'failed':
# The deployment of this cluster is failed.
clusters = session.query(
ModelCluster
).filter(
ModelCluster.id == ClusterState.id,
ClusterState.state == 'ERROR'
).all()
elif state == 'successful':
clusters = session.query(
ModelCluster
).filter(
ModelCluster.id == ClusterState.id,
ClusterState.state == 'READY'
).all()
if clusters:
for cluster in clusters:
cluster_res = {}
cluster_res['clusterName'] = cluster.name
cluster_res['id'] = cluster.id
cluster_res['link'] = {
"href": "/".join((endpoint, str(cluster.id))),
"rel": "self"}
results.append(cluster_res)
return util.make_json_response(
200, {
"status": "OK",
"clusters": results
}
)
@app.route("/clusters/<int:cluster_id>/action", methods=['POST'])
def execute_cluster_action(cluster_id):
"""Execute the specified action to the cluster.
:param cluster_id: the unique identifier of the cluster
:param addHosts: the action of adding excepted hosts to the cluster
:param removeHosts: the action of removing expected hosts from the cluster
:param replaceAllHosts: the action of removing all existing hosts in
cluster and add new hosts
:param deploy: the action of starting to deploy
"""
def _add_hosts(cluster_id, hosts):
"""Add cluster host(s) to the cluster by cluster_id."""
cluseter_hosts = []
available_machines = []
failed_machines = []
with database.session() as session:
failed_machines = []
for host in hosts:
# Check if machine exists
machine = session.query(
ModelMachine).filter_by(id=host).first()
if not machine:
error_msg = "Machine id=%s does not exist!" % host
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg)
)
clusterhost = session.query(
ModelClusterHost).filter_by(machine_id=host).first()
if clusterhost:
# Machine is already used
failed_machines.append(clusterhost.machine_id)
continue
# Add the available machine to available_machines list
available_machines.append(machine)
if failed_machines:
value = {
'failedMachines': failed_machines
}
error_msg = "Conflict!"
return errors.handle_duplicate_object(
errors.ObjectDuplicateError(error_msg), value
)
for machine, host in zip(available_machines, hosts):
host = ModelClusterHost(cluster_id=cluster_id,
machine_id=machine.id)
session.add(host)
session.flush()
cluster_res = {}
cluster_res['id'] = host.id
cluster_res['machine_id'] = machine.id
cluseter_hosts.append(cluster_res)
logging.info('cluster_hosts result is %s', cluseter_hosts)
return util.make_json_response(
200, {
"status": "OK",
"cluster_hosts": cluseter_hosts
}
)
def _remove_hosts(cluster_id, hosts):
"""Remove existing cluster host from the cluster."""
removed_hosts = []
with database.session() as session:
failed_hosts = []
for host_id in hosts:
host = session.query(
ModelClusterHost
).filter_by(
id=host_id, cluster_id=cluster_id
).first()
if not host:
failed_hosts.append(host_id)
continue
host_res = {
"id": host_id,
"machine_id": host.machine_id
}
removed_hosts.append(host_res)
if failed_hosts:
error_msg = 'Hosts do not exist! Or not in this cluster'
value = {
"failedHosts": failed_hosts
}
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg), value
)
filter_clause = []
for host_id in hosts:
filter_clause.append('id=%s' % host_id)
# Delete the requested hosts from database
session.query(ModelClusterHost).filter(
or_(*filter_clause)
).delete(synchronize_session='fetch')
return util.make_json_response(
200, {
"status": "OK",
"cluster_hosts": removed_hosts
}
)
def _replace_all_hosts(cluster_id, hosts):
"""Remove all existing hosts from the cluster and add new ones."""
with database.session() as session:
# Delete all existing hosts of the cluster
session.query(ModelClusterHost).filter_by(
cluster_id=cluster_id).delete()
return _add_hosts(cluster_id, hosts)
def _deploy(cluster_id, hosts):
"""Deploy the cluster."""
deploy_hosts_info = []
deploy_cluster_info = {}
with database.session() as session:
if not hosts:
# Deploy all hosts in the cluster
cluster_hosts = session.query(
ModelClusterHost).filter_by(cluster_id=cluster_id).all()
if not cluster_hosts:
# No host belongs to this cluster
error_msg = (
'Cannot find any host in cluster id=%s' % cluster_id)
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg))
for host in cluster_hosts:
if not host.mutable:
# The host is not allowed to modified
error_msg = (
'The host id=%s is not allowed to be '
'modified now!'
) % host.id
return errors.UserInvalidUsage(
errors.UserInvalidUsage(error_msg))
hosts.append(host.id)
deploy_cluster_info["cluster_id"] = int(cluster_id)
deploy_cluster_info["url"] = '/clusters/%s/progress' % cluster_id
for host_id in hosts:
host_info = {}
progress_url = '/cluster_hosts/%s/progress' % host_id
host_info["host_id"] = host_id
host_info["url"] = progress_url
deploy_hosts_info.append(host_info)
# Lock cluster hosts and its cluster
session.query(ModelClusterHost).filter_by(
cluster_id=cluster_id).update({'mutable': False})
session.query(ModelCluster).filter_by(
id=cluster_id).update({'mutable': False})
# Clean up cluster_state and host_state table
session.query(ClusterState).filter_by(id=cluster_id).delete()
for host_id in hosts:
session.query(HostState).filter_by(id=host_id).delete()
celery.send_task("compass.tasks.deploy", ({cluster_id: hosts},))
return util.make_json_response(
202, {
"status": "accepted",
"deployment": {
"cluster": deploy_cluster_info,
"hosts": deploy_hosts_info
}
}
)
request_data = None
with database.session() as session:
cluster = session.query(ModelCluster).filter_by(id=cluster_id).first()
if not cluster:
error_msg = 'Cluster id=%s does not exist!'
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg)
)
if not cluster.mutable:
# The cluster cannot be deploy again
error_msg = ("The cluster id=%s is not allowed to "
"modified or deployed!" % cluster_id)
return errors.handle_invalid_usage(
errors.UserInvalidUsage(error_msg))
request_data = json.loads(request.data)
action = request_data.keys()[0]
value = request_data.get(action)
if 'addHosts' in request_data:
return _add_hosts(cluster_id, value)
elif 'removeHosts' in request_data:
return _remove_hosts(cluster_id, value)
elif 'deploy' in request_data:
return _deploy(cluster_id, value)
elif 'replaceAllHosts' in request_data:
return _replace_all_hosts(cluster_id, value)
else:
return errors.handle_invalid_usage(
errors.UserInvalidUsage('%s action is not support!' % action)
)
class ClusterHostConfig(Resource):
"""Lists and update/delete cluster host configurations."""
def get(self, host_id):
"""Lists configuration details of the specified cluster host.
:param host_id: the unique identifier of the host
"""
config_res = {}
with database.session() as session:
host = session.query(
ModelClusterHost).filter_by(id=host_id).first()
if not host:
# The host does not exist.
error_msg = "The host id=%s does not exist!" % host_id
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg))
config_res = host.config
logging.debug("The config of host id=%s is %s", host_id, config_res)
return util.make_json_response(
200, {"status": "OK",
"config": config_res})
def put(self, host_id):
"""Update configuration of the specified cluster host.
:param host_id: the unique identifier of the host
"""
with database.session() as session:
host = session.query(
ModelClusterHost).filter_by(id=host_id).first()
if not host:
error_msg = "The host id=%s does not exist!" % host_id
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg))
logging.debug("cluster config put request.data %s", request.data)
request_data = json.loads(request.data)
if not request_data:
error_msg = "request data is %s" % request_data
return errors.handle_mssing_input(
errors.InputMissingError(error_msg))
if not host.mutable:
error_msg = "The host 'id=%s' is not mutable!" % host_id
return errors.handle_invalid_usage(
errors.UserInvalidUsage(error_msg))
#Valid if keywords in request_data are all correct
if 'hostname' in request_data:
hostname = request_data['hostname']
cluster_id = host.cluster_id
test_host = session.query(ModelClusterHost)\
.filter_by(cluster_id=cluster_id,
hostname=hostname).first()
if test_host and test_host.id != int(host_id):
error_msg = ("Hostname '%s' has been used for other host "
"in the cluster, cluster ID is %s! %s %s"
% (hostname, cluster_id,
test_host.id, host_id))
return errors.handle_invalid_usage(
errors.UserInvalidUsage(error_msg))
session.query(ModelClusterHost).filter_by(id=host_id)\
.update({"hostname": request_data['hostname']})
del request_data['hostname']
try:
util.valid_host_config(request_data)
except errors.UserInvalidUsage as exc:
return errors.handle_invalid_usage(exc)
host.config = request_data
return util.make_json_response(
200, {"status": "OK"})
def delete(self, host_id, subkey):
"""Delete one attribute of the specified cluster host.
:param host_id: the unique identifier of the host
:param subkey: the attribute name in configuration
"""
available_delete_keys = ['roles']
with database.session() as session:
host = session.query(
ModelClusterHost).filter_by(id=host_id).first()
if not host:
error_msg = "The host id=%s does not exist!" % host_id
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg))
if subkey not in available_delete_keys:
error_msg = "subkey %s is not supported!" % subkey
return errors.handle_invalid_usage(
errors.UserInvalidUsage(error_msg))
if not host.mutable:
error_msg = "The host 'id=%s' is not mutable!" % host_id
return errors.handle_invalid_usage(
errors.UserInvalidUsage(error_msg))
config = json.loads(host.config_data)
# Set the subkey's value to ""
util.update_dict_value(subkey, config)
host.config = config
return util.make_json_response(
200, {"status": "OK"})
class ClusterHost(Resource):
"""List details of the cluster host by host id."""
ENDPOINT = '/clusterhosts'
def get(self, host_id):
"""Lists details of the specified cluster host.
:param host_id: the unique identifier of the host
"""
host_res = {}
with database.session() as session:
host = session.query(
ModelClusterHost).filter_by(id=host_id).first()
if not host:
error_msg = "The host id=%s does not exist!" % host_id
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg))
host_res['hostname'] = host.hostname
host_res['mutable'] = host.mutable
host_res['id'] = host.id
host_res['switch_ip'] = host.machine.switch.ip
host_res['link'] = {
"href": '/'.join((self.ENDPOINT, str(host.id))),
"rel": "self"
}
return util.make_json_response(
200, {"status": "OK",
"cluster_host": host_res})
@app.route("/clusterhosts", methods=['GET'])
def list_clusterhosts():
"""Lists details of all cluster hosts.
.. note::
the cluster hosts are optionally filtered by some conditions.
:param hostname: the name of the host
:param clstername: the name of the cluster
"""
endpoint = '/clusterhosts'
key_hostname = 'hostname'
key_clustername = 'clustername'
hosts_list = []
hostname = request.args.get(key_hostname, None, type=str)
clustername = request.args.get(key_clustername, None, type=str)
with database.session() as session:
hosts = None
if hostname and clustername:
hosts = session.query(
ModelClusterHost
).join(ModelCluster).filter(
ModelClusterHost.hostname == hostname,
ModelCluster.name == clustername
).all()
elif hostname:
hosts = session.query(
ModelClusterHost).filter_by(hostname=hostname).all()
elif clustername:
cluster = session.query(
ModelCluster).filter_by(name=clustername).first()
if cluster:
hosts = cluster.hosts
else:
hosts = session.query(ModelClusterHost).all()
if hosts:
for host in hosts:
host_res = {}
host_res['hostname'] = host.hostname
host_res['mutable'] = host.mutable
host_res['id'] = host.id
host_res['switch_ip'] = host.machine.switch.ip
host_res['link'] = {
"href": '/'.join((endpoint, str(host.id))),
"rel": "self"}
hosts_list.append(host_res)
return util.make_json_response(
200, {"status": "OK",
"cluster_hosts": hosts_list})
@app.route("/adapters/<int:adapter_id>", methods=['GET'])
def list_adapter(adapter_id):
"""Lists details of the specified adapter.
:param adapter_id: the unique identifier of the adapter
"""
endpoint = '/adapters'
adapter_res = {}
with database.session() as session:
adapter = session.query(Adapter).filter_by(id=adapter_id).first()
if not adapter:
error_msg = "Adapter id=%s does not exist!" % adapter_id
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg))
adapter_res['name'] = adapter.name
adapter_res['os'] = adapter.os
adapter_res['id'] = adapter.id
adapter_res['target_system'] = adapter.target_system,
adapter_res['link'] = {
"href": "/".join((endpoint, str(adapter.id))),
"rel": "self"}
return util.make_json_response(
200, {"status": "OK",
"adapter": adapter_res})
@app.route("/adapters/<int:adapter_id>/roles", methods=['GET'])
def list_adapter_roles(adapter_id):
"""Lists details of all roles of the specified adapter
:param adapter_id: the unique identifier of the adapter
"""
roles_list = []
with database.session() as session:
adapter_q = session.query(
Adapter).filter_by(id=adapter_id).first()
if not adapter_q:
error_msg = "Adapter id=%s does not exist!" % adapter_id
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg)
)
roles = session.query(
Role, Adapter
).filter(
Adapter.id == adapter_id,
Adapter.target_system == Role.target_system
).all()
for role, _ in roles:
role_res = {}
role_res['name'] = role.name
role_res['description'] = role.description
roles_list.append(role_res)
return util.make_json_response(
200, {
"status": "OK",
"roles": roles_list
}
)
@app.route("/adapters", methods=['GET'])
def list_adapters():
"""Lists details of the adapters, optionally filtered by adapter name.
:param name: the name of the adapter
"""
endpoint = '/adapters'
name = request.args.get('name', type=str)
adapter_list = []
adapter_res = {}
with database.session() as session:
adapters = []
if name:
adapters = session.query(Adapter).filter_by(name=name).all()
else:
adapters = session.query(Adapter).all()
for adapter in adapters:
adapter_res = {}
adapter_res['name'] = adapter.name
adapter_res['os'] = adapter.os
adapter_res['target_system'] = adapter.target_system
adapter_res['id'] = adapter.id
adapter_res['link'] = {
"href": "/".join((endpoint, str(adapter.id))),
"rel": "self"}
adapter_list.append(adapter_res)
return util.make_json_response(
200, {"status": "OK",
"adapters": adapter_list})
class HostInstallingProgress(Resource):
"""Get host installing progress information."""
def get(self, host_id):
"""Lists progress details of a specific cluster host.
:param host_id: the unique identifier of the host
"""
progress_result = {}
with database.session() as session:
host = session.query(ModelClusterHost).filter_by(id=host_id)\
.first()
if not host:
error_msg = "The host id=%s does not exist!" % host_id
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg))
if not host.state:
progress_result = {
'id': host_id,
'state': 'UNINITIALIZED',
'percentage': 0,
'message': "Waiting..............",
'severity': "INFO",
}
else:
progress_result['id'] = host_id
progress_result['state'] = host.state.state
progress_result['percentage'] = host.state.progress
progress_result['message'] = host.state.message
progress_result['severity'] = host.state.severity
logging.info('progress result for %s: %s', host_id, progress_result)
return util.make_json_response(
200, {"status": "OK",
"progress": progress_result})
class ClusterInstallingProgress(Resource):
"""Get cluster installing progress information."""
def get(self, cluster_id):
"""Lists progress details of a specific cluster.
:param cluster_id: the unique identifier of the cluster
"""
progress_result = {}
with database.session() as session:
cluster = session.query(ModelCluster).filter_by(id=cluster_id)\
.first()
if not cluster:
error_msg = "The cluster id=%s does not exist!" % cluster_id
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg))
if not cluster.state:
progress_result = {
'id': cluster_id,
'state': 'UNINITIALIZED',
'percentage': 0,
'message': "Waiting..............",
'severity': "INFO"
}
else:
progress_result['id'] = cluster_id
progress_result['state'] = cluster.state.state
progress_result['percentage'] = cluster.state.progress
progress_result['message'] = cluster.state.message
progress_result['severity'] = cluster.state.severity
logging.info('progress result for cluster %s: %s',
cluster_id, progress_result)
return util.make_json_response(
200, {"status": "OK",
"progress": progress_result})
class DashboardLinks(Resource):
"""Lists dashboard links."""
ENDPOINT = "/dashboardlinks/"
def get(self):
"""Return a list of dashboard links.
"""
cluster_id = request.args.get('cluster_id', None)
logging.info('get cluster links with cluster_id=%s', cluster_id)
links = {}
with database.session() as session:
hosts = session.query(
ModelClusterHost
).filter_by(cluster_id=cluster_id).all()
if not hosts:
error_msg = "Cannot find hosts in cluster id=%s" % cluster_id
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg)
)
for host in hosts:
config = host.config
if (
'has_dashboard_roles' in config and
config['has_dashboard_roles']
):
ip_addr = config.get(
'networking', {}
).get(
'interfaces', {}
).get(
'management', {}
).get(
'ip', ''
)
roles = config.get('roles', [])
for role in roles:
links[role] = 'http://%s' % ip_addr
return util.make_json_response(
200, {
"status": "OK",
"dashboardlinks": links
}
)
class User(Resource):
ENDPOINT = '/users'
@login_required
def get(self, user_id):
"""Get user's information for specified ID."""
resp = {}
with database.session() as session:
user = session.query(ModelUser).filter_by(id=user_id).first()
if not user:
error_msg = "Cannot find the User which id is %s" % user_id
return errors.handle_not_exist(
errors.ObjectDoesNotExist(error_msg)
)
resp['id'] = user_id
resp['email'] = user.email
resp['link'] = {
"href": "/".join((self.ENDPOINT, str(user_id))),
"rel": "self"
}
return util.make_json_response(
200, {"status": "OK",
"user": resp}
)
TABLES = {
'switch_config': {
'name': SwitchConfig,
'columns': ['id', 'ip', 'filter_port']
},
'switch': {
'name': ModelSwitch,
'columns': ['id', 'ip', 'credential_data']
},
'machine': {
'name': ModelMachine,
'columns': ['id', 'mac', 'port', 'vlan', 'switch_id']
},
'cluster': {
'name': ModelCluster,
'columns': [
'id', 'name', 'security_config',
'networking_config', 'partition_config',
'adapter_id', 'state'
]
},
'cluster_host': {
'name': ModelClusterHost,
'columns': [
'id', 'cluster_id', 'hostname', 'machine_id',
'config_data', 'state'
]
},
'adapter': {
'name': Adapter,
'columns': ['id', 'name', 'os', 'target_system']
},
'role': {
'name': Role,
'columns': ['id', 'name', 'target_system', 'description']
}
}
@login_required
@app.route("/export/<string:tname>", methods=['GET'])
def export_csv(tname):
"""export to csv file."""
if tname not in TABLES:
error_msg = "Table '%s' is not supported to export or wrong table name"
return util.handle_invalid_usage(
errors.UserInvalidUsage(error_msg)
)
table = TABLES[tname]['name']
colnames = TABLES[tname]['columns']
t_headers = []
rows = []
with database.session() as session:
records = session.query(table).all()
# Get headers of the table
if not records:
t_headers = colnames
else:
first_record = records[0]
for col in colnames:
value = getattr(first_record, col)
if re.match(r'^{(.*:.*[,]*)}', str(value)):
# value is dict
util.get_headers_from_dict(t_headers, col,
json.loads(value))
else:
t_headers.append(col)
# Get columns values
for entry in records:
tmp = []
for col in colnames:
value = None
if col == 'state':
value = entry.state.state if entry.state else None
tmp.append(value)
continue
else:
value = str(json.loads(json.dumps(getattr(entry, col))))
if re.match(r'^{(.*:.*[,]*)}', value):
util.get_col_val_from_dict(tmp, json.loads(value))
else:
tmp.append(value)
rows.append(tmp)
if tname == 'cluster_host':
t_headers.append('deploy_action')
for row in rows:
row.append(0)
result = ','.join(str(x) for x in t_headers)
for row in rows:
row = ','.join(str(x) for x in row)
result = '\n'.join((result, row))
return util.make_csv_response(200, result, tname)
util.add_resource(SwitchList, '/switches')
util.add_resource(Switch, '/switches/<int:switch_id>')
util.add_resource(MachineList, '/machines')
util.add_resource(Machine, '/machines/<int:machine_id>')
util.add_resource(Cluster,
'/clusters',
'/clusters/<int:cluster_id>',
'/clusters/<int:cluster_id>/<string:resource>')
util.add_resource(ClusterHostConfig,
'/clusterhosts/<int:host_id>/config',
'/clusterhosts/<int:host_id>/config/<string:subkey>')
util.add_resource(ClusterHost, '/clusterhosts/<int:host_id>')
util.add_resource(HostInstallingProgress,
'/clusterhosts/<int:host_id>/progress')
util.add_resource(ClusterInstallingProgress,
'/clusters/<int:cluster_id>/progress')
util.add_resource(DashboardLinks, '/dashboardlinks')
util.add_resource(User,
'/users',
'/users/<int:user_id>')
if __name__ == '__main__':
app.run(debug=True)