Keystone Integration

Integrated Keystone Middleware into api
Simple oslo.config integration
Deleted legacy airflow_* controllers since they don't align with the exposed interface of shipyard

Change-Id: I9d88d1bccaec80bf9fdbcccafb985c5414285f6b
This commit is contained in:
One-Fine-Day 2017-08-22 14:07:32 -05:00
parent 77813d9410
commit d21e428909
26 changed files with 854 additions and 970 deletions

View File

@ -1,10 +1,27 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PasteDeploy==1.5.2
keystonemiddleware==4.17.0
falcon==1.2.0
python-dateutil==2.6.1
requests==2.18.4
uwsgi===2.0.15
uwsgi==2.0.15
configparser==3.5.0
python-openstackclient==3.11.0
SQLAlchemy==1.1.13
psycopg2==2.7.3.1
PasteDeploy==1.5.2
keystonemiddleware===4.9.1
oslo.config==4.11.0
oslo.policy==1.25.1
keystoneauth1==2.13.0

View File

@ -14,19 +14,25 @@
from setuptools import setup
setup(name='shipyard_airflow',
version='0.1a1',
description='API for managing Airflow-based orchestration',
url='http://github.com/att-comdev/shipyard',
author='Anthony Lin - AT&T',
author_email='al498u@att.com',
license='Apache 2.0',
packages=['shipyard_airflow',
'shipyard_airflow.control'],
install_requires=[
'falcon',
'requests',
'configparser',
'uwsgi>1.4',
'python-dateutil'
])
setup(
name='shipyard_airflow',
version='0.1a1',
description='API for managing Airflow-based orchestration',
url='http://github.com/att-comdev/shipyard',
author='Anthony Lin - AT&T',
author_email='al498u@att.com',
license='Apache 2.0',
packages=['shipyard_airflow', 'shipyard_airflow.control'],
entry_points={
"oslo.policy.policies":
["shipyard = shipyard.common.policies:list_rules"],
"oslo.config.opts": ["shipyard = shipyard.conf.opts:list_opts"]
},
install_requires=[
'falcon',
'requests',
'configparser',
'uwsgi>1.4',
'python-dateutil',
'oslo.config',
])

202
shipyard_airflow/config.py Normal file
View File

@ -0,0 +1,202 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Single point of entry to generate the sample configuration file.
This module collects all the necessary info from the other modules in this
package. It is assumed that:
* Every other module in this package has a 'list_opts' function which
returns a dict where:
* The keys are strings which are the group names.
* The value of each key is a list of config options for that group.
* The conf package doesn't have further packages with config options.
* This module is only used in the context of sample file generation.
"""
import importlib
import os
import pkgutil
from oslo_config import cfg
import keystoneauth1.loading as loading
IGNORED_MODULES = ('shipyard', 'config')
if (os.path.exists('etc/shipyard/shipyard.conf')):
cfg.CONF(['--config-file', 'etc/shipyard/shipyard.conf'])
class ShipyardConfig(object):
"""
Initialize all the core options
"""
# Default options
options = [
cfg.IntOpt(
'poll_interval',
default=10,
help=[
'''Polling interval in seconds for checking subtask or
downstream status'''
]),
]
# Logging options
logging_options = [
cfg.StrOpt(
'log_level', default='INFO', help='Global log level for Shipyard'),
cfg.StrOpt(
'global_logger_name',
default='shipyard',
help='Logger name for the top-level logger'),
]
# Enabled plugins
plugin_options = [
cfg.MultiStrOpt(
'ingester',
default=['shipyard_airflow.ingester.plugins.yaml.YamlIngester'],
help='Module path string of a input ingester to enable'),
cfg.MultiStrOpt(
'oob_driver',
default=[
'shipyard_airflow.drivers.oob.pyghmi_driver.PyghmiDriver'
],
help='Module path string of a OOB driver to enable'),
cfg.StrOpt(
'node_driver',
default=[
'''shipyard_airflow.drivers.node.maasdriver.driver
.MaasNodeDriver'''
],
help='Module path string of the Node driver to enable'),
# TODO Network driver not yet implemented
cfg.StrOpt(
'network_driver',
default=None,
help='Module path string of the Network driver enable'),
]
# Timeouts for various tasks specified in minutes
timeout_options = [
cfg.IntOpt(
'shipyard_timeout',
default=5,
help='Fallback timeout when a specific one is not configured'),
cfg.IntOpt(
'create_network_template',
default=2,
help='Timeout in minutes for creating site network templates'),
cfg.IntOpt(
'configure_user_credentials',
default=2,
help='Timeout in minutes for creating user credentials'),
cfg.IntOpt(
'identify_node',
default=10,
help='Timeout in minutes for initial node identification'),
cfg.IntOpt(
'configure_hardware',
default=30,
help=[
'''Timeout in minutes for node commissioning and
hardware configuration'''
]),
cfg.IntOpt(
'apply_node_networking',
default=5,
help='Timeout in minutes for configuring node networking'),
cfg.IntOpt(
'apply_node_platform',
default=5,
help='Timeout in minutes for configuring node platform'),
cfg.IntOpt(
'deploy_node',
default=45,
help='Timeout in minutes for deploying a node'),
]
def __init__(self):
self.conf = cfg.CONF
def register_options(self):
self.conf.register_opts(ShipyardConfig.options)
self.conf.register_opts(
ShipyardConfig.logging_options, group='logging')
self.conf.register_opts(ShipyardConfig.plugin_options, group='plugins')
self.conf.register_opts(
ShipyardConfig.timeout_options, group='timeouts')
self.conf.register_opts(
loading.get_auth_plugin_conf_options('password'),
group='keystone_authtoken')
config_mgr = ShipyardConfig()
def list_opts():
opts = {
'DEFAULT': ShipyardConfig.options,
'logging': ShipyardConfig.logging_options,
'plugins': ShipyardConfig.plugin_options,
'timeouts': ShipyardConfig.timeout_options
}
package_path = os.path.dirname(os.path.abspath(__file__))
parent_module = ".".join(__name__.split('.')[:-1])
module_names = _list_module_names(package_path, parent_module)
imported_modules = _import_modules(module_names)
_append_config_options(imported_modules, opts)
# Assume we'll use the password plugin,
# so include those options in the configuration template
opts['keystone_authtoken'] = loading.get_auth_plugin_conf_options(
'password')
return _tupleize(opts)
def _tupleize(d):
"""Convert a dict of options to the 2-tuple format."""
return [(key, value) for key, value in d.items()]
def _list_module_names(pkg_path, parent_module):
module_names = []
for _, module_name, ispkg in pkgutil.iter_modules(path=[pkg_path]):
if module_name in IGNORED_MODULES:
# Skip this module.
continue
elif ispkg:
module_names.extend(
_list_module_names(pkg_path + "/" + module_name,
parent_module + "." + module_name))
else:
module_names.append(parent_module + "." + module_name)
return module_names
def _import_modules(module_names):
imported_modules = []
for module_name in module_names:
module = importlib.import_module(module_name)
if hasattr(module, 'list_opts'):
print("Pulling options from module %s" % module.__name__)
imported_modules.append(module)
return imported_modules
def _append_config_options(imported_modules, config_options):
for module in imported_modules:
configs = module.list_opts()
for key, val in configs.items():
if key not in config_options:
config_options[key] = val
else:
config_options[key].extend(val)

View File

@ -1,78 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from urllib.parse import urlunsplit
from falcon import HTTPInvalidParam
from .base import BaseResource
from shipyard_airflow.airflow_client import AirflowClient
# We need to be able to add/delete connections so that we can create/delete
# connection endpoints that Airflow needs to connect to
class AirflowAddConnectionResource(BaseResource):
authorized_roles = ['user']
def on_get(self, req, resp, action, conn_id, protocol, host, port):
web_server_url = self.retrieve_config('base', 'web_server')
if action != 'add':
raise HTTPInvalidParam(
'Invalid Paremeters for Adding Airflow Connection', 'action')
# Concatenate to form the connection URL
netloc = ''.join([host, ':', port])
url = (protocol, netloc, '', '', '')
conn_uri = urlunsplit(url)
# Form the request URL towards Airflow
req_url = ('{}/admin/rest_api/api?api=connections&add=true&conn_id'
'={}&conn_uri={}'.format(web_server_url, conn_id, conn_uri))
airflow_client = AirflowClient(req_url)
self.on_success(resp, airflow_client.get())
# Delete a particular connection endpoint
class AirflowDeleteConnectionResource(BaseResource):
authorized_roles = ['user']
def on_get(self, req, resp, action, conn_id):
# Retrieve URL
web_server_url = self.retrieve_config('base', 'web_server')
if action != 'delete':
raise HTTPInvalidParam(
'Invalid Paremeters for Deleting Airflow Connection', 'action')
# Form the request URL towards Airflow
req_url = ('{}/admin/rest_api/api?api=connections&delete=true&conn_id'
'={}'.format(web_server_url, conn_id))
airflow_client = AirflowClient(req_url)
self.on_success(resp, airflow_client.get())
# List all current connection endpoints
class AirflowListConnectionsResource(BaseResource):
authorized_roles = ['user']
def on_get(self, req, resp, action):
web_server_url = self.retrieve_config('base', 'web_server')
if action != 'list':
raise HTTPInvalidParam(
'Invalid Paremeters for listing Airflow Connections', 'action')
req_url = '{}/admin/rest_api/api?api=connections&list=true'.format(
web_server_url)
airflow_client = AirflowClient(req_url)
self.on_success(resp, airflow_client.get())

View File

@ -1,44 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import requests
from .base import BaseResource
class GetDagStateResource(BaseResource):
authorized_roles = ['user']
def on_get(self, req, resp, dag_id, execution_date):
# Retrieve URL
web_server_url = self.retrieve_config('base', 'web_server')
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
req_url = ('{}/admin/rest_api/api?api=dag_state&dag_id={}'
'&execution_date={}'.format(web_server_url, dag_id,
execution_date))
response = requests.get(req_url).json()
if response["output"]["stderr"]:
resp.status = falcon.HTTP_400
resp.body = response["output"]["stderr"]
return
else:
resp.status = falcon.HTTP_200
resp.body = response["output"]["stdout"]

View File

@ -1,46 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import requests
from .base import BaseResource
class GetTaskStatusResource(BaseResource):
authorized_roles = ['user']
def on_get(self, req, resp, dag_id, task_id, execution_date):
# Retrieve URL
web_server_url = self.retrieve_config('base', 'web_server')
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
req_url = (
'{}/admin/rest_api/api?api=task_state&dag_id={}&task_id={}'
'&execution_date={}'.format(web_server_url, dag_id, task_id,
execution_date))
response = requests.get(req_url).json()
if response["output"]["stderr"]:
resp.status = falcon.HTTP_400
resp.body = response["output"]["stderr"]
return
else:
resp.status = falcon.HTTP_200
resp.body = response["output"]["stdout"]

View File

@ -1,44 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import requests
from .base import BaseResource
class GetAirflowVersionResource(BaseResource):
authorized_roles = ['user']
def on_get(self, req, resp):
# Retrieve URL
web_server_url = self.retrieve_config('base', 'web_server')
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
# Get Airflow Version
req_url = '{}/admin/rest_api/api?api=version'.format(
web_server_url)
response = requests.get(req_url).json()
if response["output"]:
resp.status = falcon.HTTP_200
resp.body = response["output"]
else:
self.return_error(resp, falcon.HTTP_400,
'Fail to Retrieve Airflow Version')
return

View File

@ -1,44 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import requests
from .base import BaseResource
class ListDagsResource(BaseResource):
authorized_roles = ['user']
def on_get(self, req, resp):
# Retrieve URL
web_server_url = self.retrieve_config('base', 'web_server')
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
# List available dags
req_url = '{}/admin/rest_api/api?api=list_dags'.format(
web_server_url)
response = requests.get(req_url).json()
if response["output"]["stderr"]:
resp.status = falcon.HTTP_400
resp.body = response["output"]["stderr"]
return
else:
resp.status = falcon.HTTP_200
resp.body = response["output"]["stdout"]

View File

@ -1,44 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import requests
from .base import BaseResource
class ListTasksResource(BaseResource):
authorized_roles = ['user']
def on_get(self, req, resp, dag_id):
# Retrieve URL
web_server_url = self.retrieve_config('base', 'web_server')
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
# Retrieve all tasks belonging to a particular Dag
req_url = '{}/admin/rest_api/api?api=list_tasks&dag_id={}'.format(
web_server_url, dag_id)
response = requests.get(req_url).json()
if response["output"]["stderr"]:
resp.status = falcon.HTTP_400
resp.body = response["output"]["stderr"]
return
else:
resp.status = falcon.HTTP_200
resp.body = response["output"]["stdout"]

View File

@ -1,54 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import requests
from dateutil.parser import parse
from .base import BaseResource
class TriggerDagRunResource(BaseResource):
authorized_roles = ['user']
def on_get(self, req, resp, dag_id, conf):
# Retrieve URL
web_server_url = self.retrieve_config('base', 'web_server')
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
# "conf" - JSON string that gets pickled into the DagRun's
# conf attribute
req_url = ('{}/admin/rest_api/api?api=trigger_dag&dag_id={}'
'&conf={}'.format(web_server_url, dag_id, conf))
response = requests.get(req_url).json()
# Returns error response if API call returns
# response code other than 200
if response["http_response_code"] != 200:
resp.status = falcon.HTTP_400
resp.body = response["output"]
return
else:
# Returns 201 if action is created successfully
resp.status = falcon.HTTP_201
# Return time of execution so that we can use
# it to query dag/task status
dt = parse(response["response_time"])
resp.body = dt.strftime('%Y-%m-%dT%H:%M:%S')

View File

@ -1,92 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import json
import requests
import time
import logging
from dateutil.parser import parse
from .base import BaseResource
class TriggerDagRunPollResource(BaseResource):
authorized_roles = ['user']
def on_get(self, req, resp, dag_id, run_id):
# Retrieve URL
web_server_url = self.retrieve_config('base', 'web_server')
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
req_url = ('{}/admin/rest_api/api?api=trigger_dag&dag_id={}'
'&run_id={}'.format(web_server_url, dag_id, run_id))
response = requests.get(req_url).json()
if response["http_response_code"] != 200:
resp.status = falcon.HTTP_400
resp.body = response["output"]
return
else:
resp.status = falcon.HTTP_200
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Executing '" + dag_id + "' Dag...")
# Retrieve time of execution so that we
# can use it to query dag/task status
dt = parse(response["response_time"])
exec_date = dt.strftime('%Y-%m-%dT%H:%M:%S')
url = ('{}/admin/rest_api/api'
'?api=dag_state&dag_id={}&execution_date={}'.format(
web_server_url, dag_id, exec_date))
# Back off for 5 seconds before querying the initial state
time.sleep(5)
dag_state = requests.get(url).json()
# Remove newline character at the end of the response
dag_state = dag_state["output"]["stdout"].encode(
'utf8').rstrip()
while dag_state != 'success':
# Get current state
dag_state = requests.get(url).json()
# Remove newline character at the end of the response
dag_state = dag_state["output"]["stdout"].encode(
'utf8').rstrip()
# Logs output of current dag state
logging.info('Current Dag State: ' + dag_state)
if dag_state == 'failed':
resp.status = falcon.HTTP_500
logging.info('Dag Execution Failed')
resp.body = json.dumps({
'Error': 'Dag Execution Failed'
})
return
# Wait for 20 seconds before doing a new query
time.sleep(20)
logging.info('Dag Successfully Executed')

View File

@ -0,0 +1,25 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#PasteDeploy Configuration File
#Used to configure uWSGI middleware pipeline
[app:shipyard-api]
paste.app_factory = shipyard_airflow.shipyard:paste_start_shipyard
[pipeline:main]
pipeline = authtoken shipyard-api
[filter:authtoken]
paste.filter_factory = keystonemiddleware.auth_token:filter_factory

View File

@ -11,28 +11,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import json
import falcon
from shipyard_airflow.errors import AppError
from .regions import RegionsResource, RegionResource
from .base import ShipyardRequest, BaseResource
from .tasks import TaskResource
from .dag_runs import DagRunResource
from .airflow_get_task_status import GetTaskStatusResource
from .airflow_list_tasks import ListTasksResource
from .airflow_list_dags import ListDagsResource
from .airflow_dag_state import GetDagStateResource
from .airflow_trigger_dag import TriggerDagRunResource
from .airflow_trigger_dag_poll import TriggerDagRunPollResource
from .airflow_connections import AirflowAddConnectionResource
from .airflow_connections import AirflowDeleteConnectionResource
from .airflow_connections import AirflowListConnectionsResource
from .airflow_get_version import GetAirflowVersionResource
from .middleware import AuthMiddleware, ContextMiddleware, LoggingMiddleware
from shipyard_airflow.errors import AppError
from .health import HealthResource
def start_api():
middlewares = [
AuthMiddleware(),
@ -49,24 +36,6 @@ def start_api():
# API for managing region data
('/regions', RegionsResource()),
('/regions/{region_id}', RegionResource()),
('/dags/{dag_id}/tasks/{task_id}', TaskResource()),
('/dags/{dag_id}/dag_runs', DagRunResource()),
('/list_dags', ListDagsResource()),
('/task_state/dags/{dag_id}/tasks/{task_id}/execution_date/'
'{execution_date}', GetTaskStatusResource()),
('/dag_state/dags/{dag_id}/execution_date/{execution_date}',
GetDagStateResource()),
('/list_tasks/dags/{dag_id}', ListTasksResource()),
('/trigger_dag/dags/{dag_id}/conf/{conf}',
TriggerDagRunResource()),
('/trigger_dag/dags/{dag_id}/run_id/{run_id}/poll',
TriggerDagRunPollResource()),
('/connections/{action}/conn_id/{conn_id}/protocol/{protocol}'
'/host/{host}/port/{port}', AirflowAddConnectionResource()),
('/connections/{action}/conn_id/{conn_id}',
AirflowDeleteConnectionResource()),
('/connections/{action}', AirflowListConnectionsResource()),
('/airflow/version', GetAirflowVersionResource()),
('/health', HealthResource()),
]

View File

@ -12,11 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import falcon.request as request
import uuid
import json
import configparser
import os
import logging
try:
from collections import OrderedDict
except ImportError:
@ -30,8 +31,6 @@ from shipyard_airflow.errors import (
class BaseResource(object):
authorized_roles = []
def on_options(self, req, resp):
self_attrs = dir(self)
methods = ['GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'PATCH']
@ -44,16 +43,6 @@ class BaseResource(object):
resp.headers['Allow'] = ','.join(allowed_methods)
resp.status = falcon.HTTP_200
# By default, no one is authorized to use a resource
def authorize_roles(self, role_list):
authorized = set(self.authorized_roles)
applied = set(role_list)
if authorized.isdisjoint(applied):
return False
else:
return True
def to_json(self, body_dict):
return json.dumps(body_dict)
@ -99,6 +88,25 @@ class BaseResource(object):
else:
raise AppError(ERR_UNKNOWN, "Missing Configuration File")
def error(self, ctx, msg):
self.log_error(ctx, logging.ERROR, msg)
def info(self, ctx, msg):
self.log_error(ctx, logging.INFO, msg)
def log_error(self, ctx, level, msg):
extra = {
'user': 'N/A',
'req_id': 'N/A',
'external_ctx': 'N/A'
}
if ctx is not None:
extra = {
'user': ctx.user,
'req_id': ctx.request_id,
'external_ctx': ctx.external_marker,
}
class ShipyardRequestContext(object):
@ -108,6 +116,14 @@ class ShipyardRequestContext(object):
self.roles = ['anyone']
self.request_id = str(uuid.uuid4())
self.external_marker = None
self.project_id = None
self.user_id = None # User ID (UUID)
self.policy_engine = None
self.user_domain_id = None # Domain owning user
self.project_domain_id = None # Domain owning project
self.is_admin_project = False
self.authenticated = False
self.request_id = str(uuid.uuid4())
def set_log_level(self, level):
if level in ['error', 'info', 'debug']:
@ -116,6 +132,9 @@ class ShipyardRequestContext(object):
def set_user(self, user):
self.user = user
def set_project(self, project):
self.project = project
def add_role(self, role):
self.roles.append(role)
@ -127,7 +146,22 @@ class ShipyardRequestContext(object):
if x != role]
def set_external_marker(self, marker):
self.external_marker = str(marker)[:32]
self.external_marker = marker
class ShipyardRequest(request.Request):
def set_policy_engine(self, engine):
self.policy_engine = engine
def to_policy_view(self):
policy_dict = {}
policy_dict['user_id'] = self.user_id
policy_dict['user_domain_id'] = self.user_domain_id
policy_dict['project_id'] = self.project_id
policy_dict['project_domain_id'] = self.project_domain_id
policy_dict['roles'] = self.roles
policy_dict['is_admin_project'] = self.is_admin_project
return policy_dict
class ShipyardRequest(falcon.request.Request):
context_type = ShipyardRequestContext

View File

@ -1,54 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import requests
from .base import BaseResource
class DagRunResource(BaseResource):
authorized_roles = ['user']
def on_post(self,
req,
resp,
dag_id,
run_id=None,
conf=None,
execution_date=None):
# Retrieve URL
web_server_url = self.retrieve_config('base', 'web_server')
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
req_url = '{}/api/experimental/dags/{}/dag_runs'.format(
web_server_url, dag_id)
response = requests.post(
req_url,
json={
"run_id": run_id,
"conf": conf,
"execution_date": execution_date,
})
if response.ok:
resp.status = falcon.HTTP_200
else:
self.return_error(resp, falcon.HTTP_400, 'Fail to Execute Dag')
return

View File

@ -15,11 +15,8 @@ import falcon
from shipyard_airflow.control.base import BaseResource
class HealthResource(BaseResource):
authorized_roles = ['anyone']
# Return empty response/body to show
# that shipyard is healthy
def on_get(self, req, resp):

View File

@ -11,66 +11,90 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import logging
from uuid import UUID
from oslo_utils import uuidutils
from shipyard_airflow import policy
class AuthMiddleware(object):
def __init__(self):
self.logger = logging.getLogger('shipyard')
# Authentication
def process_request(self, req, resp):
ctx = req.context
token = req.get_header('X-Auth-Token')
ctx.set_policy_engine(policy.policy_engine)
user = self.validate_token(token)
for k, v in req.headers.items():
self.logger.debug("Request with header %s: %s" % (k, v))
if user is not None:
ctx.set_user(user)
user_roles = self.role_list(user)
ctx.add_roles(user_roles)
auth_status = req.get_header(
'X-SERVICE-IDENTITY-STATUS') # will be set to Confirmed or Invalid
service = True
if auth_status is None:
auth_status = req.get_header('X-IDENTITY-STATUS')
service = False
if auth_status == 'Confirmed':
# Process account and roles
ctx.authenticated = True
# User Identity, unique within owning domain
ctx.user = req.get_header(
'X-SERVICE-USER-NAME') if service else req.get_header(
'X-USER-NAME')
# Identity-service managed unique identifier
ctx.user_id = req.get_header(
'X-SERVICE-USER-ID') if service else req.get_header(
'X-USER-ID')
# Identity service managed unique identifier of owning domain of
# user name
ctx.user_domain_id = req.get_header(
'X-SERVICE-USER-DOMAIN-ID') if service else req.get_header(
'X-USER-DOMAIN-ID')
# Identity service managed unique identifier
ctx.project_id = req.get_header(
'X-SERVICE-PROJECT-ID') if service else req.get_header(
'X-PROJECT-ID')
# Name of owning domain of project
ctx.project_domain_id = req.get_header(
'X-SERVICE-PROJECT-DOMAIN-ID') if service else req.get_header(
'X-PROJECT-DOMAIN-NAME')
if service:
# comma delimieted list of case-sensitive role names
ctx.add_roles(req.get_header('X-SERVICE-ROLES').split(','))
else:
ctx.add_roles(req.get_header('X-ROLES').split(','))
if req.get_header('X-IS-ADMIN-PROJECT') == 'True':
ctx.is_admin_project = True
else:
ctx.is_admin_project = False
self.logger.debug(
'Request from authenticated user %s with roles %s' %
(ctx.user, ','.join(ctx.roles)))
else:
ctx.add_role('anyone')
# Authorization
def process_resource(self, req, resp, resource, params):
ctx = req.context
if not resource.authorize_roles(ctx.roles):
raise falcon.HTTPUnauthorized(
'Authentication required',
('This resource requires an authorized role.'))
# Return the username associated with an authenticated token or None
def validate_token(self, token):
if token == '10':
return 'shipyard'
elif token == 'admin':
return 'admin'
else:
return None
# Return the list of roles assigned to the username
# Roles need to be an enum
def role_list(self, username):
if username == 'shipyard':
return ['user']
elif username == 'admin':
return ['user', 'admin']
ctx.authenticated = False
class ContextMiddleware(object):
def __init__(self):
# Setup validation pattern for external marker
try:
uuid_value = uuidutils.generate_uuid(dashed=True)
UUID(uuid_value)
except:
self.logger.error('UUID generation fail')
def process_request(self, req, resp):
ctx = req.context
requested_logging = req.get_header('X-Log-Level')
if requested_logging == 'DEBUG' and 'admin' in ctx.roles:
ctx.set_log_level('debug')
elif requested_logging == 'INFO':
ctx.set_log_level('info')
ext_marker = req.get_header('X-Context-Marker')
ctx.set_external_marker(ext_marker if ext_marker is not None else '')
if ext_marker is not None and self.marker_re.fullmatch(ext_marker):
ctx.set_external_marker(ext_marker)
class LoggingMiddleware(object):

View File

@ -14,19 +14,16 @@
import falcon
from .base import BaseResource
from shipyard_airflow import policy
class RegionsResource(BaseResource):
authorized_roles = ['user']
@policy.ApiEnforcer('workflow_orchestrator:get_regions')
def on_get(self, req, resp):
resp.status = falcon.HTTP_200
class RegionResource(BaseResource):
authorized_roles = ['user']
@policy.ApiEnforcer('workflow_orchestrator:get_regions')
def on_get(self, req, resp, region_id):
resp.status = falcon.HTTP_200

View File

@ -1,36 +0,0 @@
[base]
web_server=http://localhost:32080
postgresql_db = postgresql+psycopg2://postgresql.ucp:5432/shipyard
postgresql_airflow_db = postgresql+psycopg2://postgresql.ucp:5432/airflow
[shipyard]
host=shipyard-int.ucp
port=9000
[deckhand]
host=deckhand-api.ucp
port=80
[armada]
host=armada-api.ucp
port=8000
[drydock]
host=drydock-api.ucp
port=9000
token=bigboss
site_yaml=/usr/local/airflow/plugins/drydock.yaml
prom_yaml=/usr/local/airflow/plugins/promenade.yaml
[keystone]
OS_AUTH_URL=http://keystone-api.ucp:80/v3
OS_PROJECT_NAME=service
OS_USER_DOMAIN_NAME=Default
OS_USERNAME=shipyard
OS_PASSWORD=password
OS_REGION_NAME=RegionOne
OS_IDENTITY_API_VERSION=3
[healthcheck]
schema=http
endpoint=/api/v1.0/health

View File

@ -0,0 +1,320 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[base]
web_server=http://localhost:32080
postgresql_db = postgresql+psycopg2://postgresql.ucp:5432/shipyard
postgresql_airflow_db = postgresql+psycopg2://postgresql.ucp:5432/airflow
[shipyard]
host=shipyard-int.ucp
port=9000
[deckhand]
host=deckhand-api.ucp
port=80
[armada]
host=armada-api.ucp
port=8000
[drydock]
host=drydock-api.ucp
port=9000
token=bigboss
site_yaml=/usr/local/airflow/plugins/drydock.yaml
prom_yaml=/usr/local/airflow/plugins/promenade.yaml
[keystone]
OS_AUTH_URL=http://keystone-api.ucp:80/v3
OS_PROJECT_NAME=service
OS_USER_DOMAIN_NAME=Default
OS_USERNAME=shipyard
OS_PASSWORD=password
OS_REGION_NAME=RegionOne
OS_IDENTITY_API_VERSION=3
[healthcheck]
schema=http
endpoint=/api/v1.0/health
[keystone_authtoken]
#
# From keystonemiddleware.auth_token
#
# Complete "public" Identity API endpoint. This endpoint should not be an
# "admin" endpoint, as it should be accessible by all end users. Unauthenticated
# clients are redirected to this endpoint to authenticate. Although this
# endpoint should  ideally be unversioned, client support in the wild varies.
# If you're using a versioned v2 endpoint here, then this  should *not* be the
# same endpoint the service user utilizes  for validating tokens, because normal
# end users may not be  able to reach that endpoint. (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.auth_uri
auth_uri = http://keystone-api.openstack:80/v3
# API version of the admin Identity API endpoint. (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.auth_version
#auth_version = <None>
# Do not handle authorization requests within the middleware, but delegate the
# authorization decision to downstream WSGI components. (boolean value)
# from .keystone_authtoken.keystonemiddleware.auth_token.delay_auth_decision
delay_auth_decision = true
# Request timeout value for communicating with Identity API server. (integer
# value)
# from .keystone_authtoken.keystonemiddleware.auth_token.http_connect_timeout
#http_connect_timeout = <None>
# How many times are we trying to reconnect when communicating with Identity API
# Server. (integer value)
# from .keystone_authtoken.keystonemiddleware.auth_token.http_request_max_retries
#http_request_max_retries = 3
# Request environment key where the Swift cache object is stored. When
# auth_token middleware is deployed with a Swift cache, use this option to have
# the middleware share a caching backend with swift. Otherwise, use the
# ``memcached_servers`` option instead. (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.cache
#cache = <None>
# Required if identity server requires client certificate (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.certfile
#certfile = <None>
# Required if identity server requires client certificate (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.keyfile
#keyfile = <None>
# A PEM encoded Certificate Authority to use when verifying HTTPs connections.
# Defaults to system CAs. (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.cafile
#cafile = <None>
# Verify HTTPS connections. (boolean value)
# from .keystone_authtoken.keystonemiddleware.auth_token.insecure
#insecure = false
# The region in which the identity server can be found. (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.region_name
#region_name = <None>
# Directory used to cache files related to PKI tokens. (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.signing_dir
#signing_dir = <None>
# Optionally specify a list of memcached server(s) to use for caching. If left
# undefined, tokens will instead be cached in-process. (list value)
# Deprecated group/name - [keystone_authtoken]/memcache_servers
# from .keystone_authtoken.keystonemiddleware.auth_token.memcached_servers
#memcached_servers = <None>
# In order to prevent excessive effort spent validating tokens, the middleware
# caches previously-seen tokens for a configurable duration (in seconds). Set to
# -1 to disable caching completely. (integer value)
# from .keystone_authtoken.keystonemiddleware.auth_token.token_cache_time
#token_cache_time = 300
# Determines the frequency at which the list of revoked tokens is retrieved from
# the Identity service (in seconds). A high number of revocation events combined
# with a low cache duration may significantly reduce performance. Only valid for
# PKI tokens. (integer value)
# from .keystone_authtoken.keystonemiddleware.auth_token.revocation_cache_time
#revocation_cache_time = 10
# (Optional) If defined, indicate whether token data should be authenticated or
# authenticated and encrypted. If MAC, token data is authenticated (with HMAC)
# in the cache. If ENCRYPT, token data is encrypted and authenticated in the
# cache. If the value is not one of these options or empty, auth_token will
# raise an exception on initialization. (string value)
# Allowed values: None, MAC, ENCRYPT
# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_security_strategy
#memcache_security_strategy = None
# (Optional, mandatory if memcache_security_strategy is defined) This string is
# used for key derivation. (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_secret_key
#memcache_secret_key = <None>
# (Optional) Number of seconds memcached server is considered dead before it is
# tried again. (integer value)
# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_pool_dead_retry
#memcache_pool_dead_retry = 300
# (Optional) Maximum total number of open connections to every memcached server.
# (integer value)
# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_pool_maxsize
#memcache_pool_maxsize = 10
# (Optional) Socket timeout in seconds for communicating with a memcached
# server. (integer value)
# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_pool_socket_timeout
#memcache_pool_socket_timeout = 3
# (Optional) Number of seconds a connection to memcached is held unused in the
# pool before it is closed. (integer value)
# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_pool_unused_timeout
#memcache_pool_unused_timeout = 60
# (Optional) Number of seconds that an operation will wait to get a memcached
# client connection from the pool. (integer value)
# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_pool_conn_get_timeout
#memcache_pool_conn_get_timeout = 10
# (Optional) Use the advanced (eventlet safe) memcached client pool. The
# advanced pool will only work under python 2.x. (boolean value)
# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_use_advanced_pool
#memcache_use_advanced_pool = false
# (Optional) Indicate whether to set the X-Service-Catalog header. If False,
# middleware will not ask for service catalog on token validation and will not
# set the X-Service-Catalog header. (boolean value)
# from .keystone_authtoken.keystonemiddleware.auth_token.include_service_catalog
#include_service_catalog = true
# Used to control the use and type of token binding. Can be set to: "disabled"
# to not check token binding. "permissive" (default) to validate binding
# information if the bind type is of a form known to the server and ignore it if
# not. "strict" like "permissive" but if the bind type is unknown the token will
# be rejected. "required" any form of token binding is needed to be allowed.
# Finally the name of a binding method that must be present in tokens. (string
# value)
# from .keystone_authtoken.keystonemiddleware.auth_token.enforce_token_bind
#enforce_token_bind = permissive
# If true, the revocation list will be checked for cached tokens. This requires
# that PKI tokens are configured on the identity server. (boolean value)
# from .keystone_authtoken.keystonemiddleware.auth_token.check_revocations_for_cached
#check_revocations_for_cached = false
# Hash algorithms to use for hashing PKI tokens. This may be a single algorithm
# or multiple. The algorithms are those supported by Python standard
# hashlib.new(). The hashes will be tried in the order given, so put the
# preferred one first for performance. The result of the first hash will be
# stored in the cache. This will typically be set to multiple values only while
# migrating from a less secure algorithm to a more secure one. Once all the old
# tokens are expired this option should be set to a single value for better
# performance. (list value)
# from .keystone_authtoken.keystonemiddleware.auth_token.hash_algorithms
#hash_algorithms = md5
# Authentication type to load (string value)
# Deprecated group/name - [keystone_authtoken]/auth_plugin
# from .keystone_authtoken.keystonemiddleware.auth_token.auth_type
auth_type = password
# Config Section from which to load plugin specific options (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.auth_section
auth_section = keystone_authtoken
#
# From shipyard_orchestrator
#
# Authentication URL (string value)
# from .keystone_authtoken.shipyard_orchestrator.auth_url
auth_url = http://keystone-api.openstack:80/v3
# Domain ID to scope to (string value)
# from .keystone_authtoken.shipyard_orchestrator.domain_id
#domain_id = <None>
# Domain name to scope to (string value)
# from .keystone_authtoken.shipyard_orchestrator.domain_name
#domain_name = <None>
# Project ID to scope to (string value)
# Deprecated group/name - [keystone_authtoken]/tenant-id
# from .keystone_authtoken.shipyard_orchestrator.project_id
#project_id = <None>
# Project name to scope to (string value)
# Deprecated group/name - [keystone_authtoken]/tenant-name
# from .keystone_authtoken.shipyard_orchestrator.project_name
project_name = service
# Domain ID containing project (string value)
# from .keystone_authtoken.shipyard_orchestrator.project_domain_id
#project_domain_id = <None>
# Domain name containing project (string value)
# from .keystone_authtoken.shipyard_orchestrator.project_domain_name
project_domain_name = default
# Trust ID (string value)
# from .keystone_authtoken.shipyard_orchestrator.trust_id
#trust_id = <None>
# Optional domain ID to use with v3 and v2 parameters. It will be used for both
# the user and project domain in v3 and ignored in v2 authentication. (string
# value)
# from .keystone_authtoken.shipyard_orchestrator.default_domain_id
#default_domain_id = <None>
# Optional domain name to use with v3 API and v2 parameters. It will be used for
# both the user and project domain in v3 and ignored in v2 authentication.
# (string value)
# from .keystone_authtoken.shipyard_orchestrator.default_domain_name
#default_domain_name = <None>
# User id (string value)
# from .keystone_authtoken.shipyard_orchestrator.user_id
#user_id = <None>
# Username (string value)
# Deprecated group/name - [keystone_authtoken]/user-name
# from .keystone_authtoken.shipyard_orchestrator.username
username = shipyard
# User's domain id (string value)
# from .keystone_authtoken.shipyard_orchestrator.user_domain_id
#user_domain_id = <None>
# User's domain name (string value)
# from .keystone_authtoken.shipyard_orchestrator.user_domain_name
user_domain_name = default
# User's password (string value)
# from .keystone_authtoken.shipyard_orchestrator.password
password = password
[oslo_policy]
#
# From oslo.policy
#
# The file that defines policies. (string value)
# Deprecated group/name - [DEFAULT]/policy_file
# from .oslo_policy.oslo.policy.policy_file
#policy_file = policy.json
# Default rule. Enforced when a requested rule is not found. (string value)
# Deprecated group/name - [DEFAULT]/policy_default_rule
# from .oslo_policy.oslo.policy.policy_default_rule
#policy_default_rule = default
# Directories where policy configuration files are stored. They can be relative
# to any directory in the search path defined by the config_dir option, or
# absolute paths. The file defined by policy_file must exist for these
# directories to be searched.  Missing or empty directories are ignored. (multi
# valued)
# Deprecated group/name - [DEFAULT]/policy_dirs
# from .oslo_policy.oslo.policy.policy_dirs (multiopt)
#policy_dirs = policy.d

View File

@ -1,44 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import json
import requests
from .base import BaseResource
class TaskResource(BaseResource):
authorized_roles = ['user']
def on_get(self, req, resp, dag_id, task_id):
# Retrieve URL
web_server_url = self.retrieve_config('base', 'web_server')
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
req_url = '{}/api/experimental/dags/{}/tasks/{}'.format(
web_server_url, dag_id, task_id)
task_details = requests.get(req_url).json()
if 'error' in task_details:
resp.status = falcon.HTTP_400
resp.body = json.dumps(task_details)
return
else:
resp.status = falcon.HTTP_200
resp.body = json.dumps(task_details)

View File

@ -8,10 +8,7 @@ try:
except ImportError:
OrderedDict = dict
ERR_UNKNOWN = {
'status': falcon.HTTP_500,
'title': 'Internal Server Error'
}
ERR_UNKNOWN = {'status': falcon.HTTP_500, 'title': 'Internal Server Error'}
ERR_AIRFLOW_RESPONSE = {
'status': falcon.HTTP_400,

122
shipyard_airflow/policy.py Normal file
View File

@ -0,0 +1,122 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import functools
import falcon
from oslo_config import cfg
from oslo_policy import policy
policy_engine = None
class ShipyardPolicy(object):
"""
Initialize policy defaults
"""
# Base Policy
base_rules = [
policy.RuleDefault(
'admin_required',
'role:admin',
description='Actions requiring admin authority'),
]
# Orchestrator Policy
task_rules = [
policy.DocumentedRuleDefault('workflow_orchestrator:get_regions',
'role:admin', 'Get region information', [{
'path':
'/api/v1.0/regions',
'method':
'GET'
}, {
'path':
'/api/v1.0/regions/{region_id}',
'method':
'GET'
}])
]
# Regions Policy
def __init__(self):
self.enforcer = policy.Enforcer(cfg.CONF)
def register_policy(self):
self.enforcer.register_defaults(ShipyardPolicy.base_rules)
self.enforcer.register_defaults(ShipyardPolicy.task_rules)
def authorize(self, action, ctx):
target = {'project_id': ctx.project_id, 'user_id': ctx.user_id}
self.enforcer.authorize(action, target, ctx.to_policy_view())
return self.enforcer.authorize(action, target, ctx.to_policy_view())
class ApiEnforcer(object):
"""
A decorator class for enforcing RBAC policies
"""
def __init__(self, action):
self.action = action
self.logger = logging.getLogger('shipyard.policy')
def __call__(self, f):
@functools.wraps(f)
def secure_handler(slf, req, resp, *args, **kwargs):
ctx = req.context
policy_engine = ctx.policy_engine
self.logger.debug("Enforcing policy %s on request %s" %
(self.action, ctx.request_id))
try:
if policy_engine is not None and policy_engine.authorize(
self.action, ctx):
return f(slf, req, resp, *args, **kwargs)
else:
if ctx.authenticated:
slf.info(ctx, "Error - Forbidden access - action: %s" %
self.action)
slf.return_error(
resp,
falcon.HTTP_403,
message="Forbidden",
retry=False)
else:
slf.info(ctx, "Error - Unauthenticated access")
slf.return_error(
resp,
falcon.HTTP_401,
message="Unauthenticated",
retry=False)
except:
slf.info(
ctx,
"Error - Expectation Failed - action: %s" % self.action)
slf.return_error(
resp,
falcon.HTTP_417,
message="Expectation Failed",
retry=False)
return secure_handler
def list_policies():
default_policy = []
default_policy.extend(ShipyardPolicy.base_rules)
default_policy.extend(ShipyardPolicy.task_rules)
return default_policy

21
shipyard_airflow/shipyard.py Normal file → Executable file
View File

@ -12,11 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from oslo_config import cfg
from shipyard_airflow import policy
import shipyard_airflow.control.api as api
# We need to import config so the initializing code can run for oslo config
import shipyard_airflow.config as config # noqa: F401
def start_shipyard():
# Setup configuration parsing
cli_options = [
cfg.BoolOpt(
'debug', short='d', default=False, help='Enable debug logging'),
]
# Setup root logger
logger = logging.getLogger('shipyard')
@ -37,7 +49,16 @@ def start_shipyard():
ch.setFormatter(formatter)
logger.addHandler(ch)
# Setup the RBAC policy enforcer
policy.policy_engine = policy.ShipyardPolicy()
policy.policy_engine.register_policy()
return api.start_api()
# Initialization compatible with PasteDeploy
def paste_start_shipyard(global_conf, **kwargs):
return shipyard
shipyard = start_shipyard()

View File

@ -8,4 +8,4 @@ apache-airflow[crypto,celery,postgres,hive,hdfs,jdbc]==1.8.1
flake8==3.3.0
# Security scanning
bandit>=1.1.0 # Apache-2.0
bandit>=1.1.0 # Apache-2.0

View File

@ -1,267 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
from falcon import testing
import mock
from shipyard_airflow.control import api
from shipyard_airflow.control.airflow_connections import (
AirflowAddConnectionResource,
AirflowDeleteConnectionResource,
AirflowListConnectionsResource,
)
class BaseTesting(testing.TestCase):
def setUp(self):
super().setUp()
self.app = api.start_api()
self.conn_id = 1
self.protocol = 'http'
self.host = '10.0.0.1'
self.port = '3000'
@property
def _headers(self):
return {
'X-Auth-Token': '10'
}
class AirflowAddConnectionResourceTestCase(BaseTesting):
def setUp(self):
super().setUp()
self.action = 'add'
@property
def _url(self):
return ('/api/v1.0/connections/{}/conn_id/{}/'
'protocol/{}/host/{}/port/{}'.format(
self.action, self.conn_id,
self.protocol, self.host, self.port))
def test_on_get_missing_config_file(self):
doc = {
'description': 'Missing Configuration File',
'message': 'Internal Server Error'
}
result = self.simulate_get(self._url, headers=self._headers)
assert result.json == doc
assert result.status == falcon.HTTP_500
@mock.patch.object(AirflowAddConnectionResource, 'retrieve_config')
def test_on_get_invalid_action(self, mock_config):
self.action = 'invalid_action'
doc = {
'title': 'Invalid parameter',
'description': ('The "action" parameter is invalid.'
' Invalid Paremeters for Adding Airflow'
' Connection')
}
mock_config.return_value = 'some_url'
result = self.simulate_get(self._url, headers=self._headers)
assert result.json == doc
assert result.status == falcon.HTTP_400
mock_config.assert_called_once_with('base', 'web_server')
@mock.patch('shipyard_airflow.airflow_client.requests')
@mock.patch.object(AirflowAddConnectionResource, 'retrieve_config')
def test_on_get_airflow_error(self, mock_config, mock_requests):
doc = {
'message': 'Error response from Airflow',
'description': "can't add connections in airflow"
}
mock_response = {
'output': {
'stderr': "can't add connections in airflow"
}
}
mock_requests.get.return_value.json.return_value = mock_response
mock_config.return_value = 'some_url'
result = self.simulate_get(self._url, headers=self._headers)
assert result.json == doc
assert result.status == falcon.HTTP_400
mock_config.assert_called_once_with('base', 'web_server')
@mock.patch('shipyard_airflow.airflow_client.requests')
@mock.patch.object(AirflowAddConnectionResource, 'retrieve_config')
def test_on_get_airflow_success(self, mock_config, mock_requests):
doc = {
'type': 'success',
'message': 'Airflow Success',
}
mock_response = {
'output': {
'stderr': None,
'stdout': 'Airflow Success'
}
}
mock_requests.get.return_value.json.return_value = mock_response
mock_config.return_value = 'some_url'
result = self.simulate_get(self._url, headers=self._headers)
assert result.json == doc
assert result.status == falcon.HTTP_200
mock_config.assert_called_once_with('base', 'web_server')
class AirflowDeleteConnectionResource(BaseTesting):
def setUp(self):
self.action = 'delete'
super().setUp()
@property
def _url(self):
return '/api/v1.0/connections/{}/conn_id/{}'.format(
self.action, self.conn_id)
def test_on_get_missing_config_file(self):
doc = {
'description': 'Missing Configuration File',
'message': 'Internal Server Error'
}
result = self.simulate_get(self._url, headers=self._headers)
assert result.json == doc
assert result.status == falcon.HTTP_500
@mock.patch.object(AirflowDeleteConnectionResource, 'retrieve_config')
def test_on_get_invalid_action(self, mock_config):
self.action = 'invalid_action'
doc = {
'title': 'Invalid parameter',
'description': ('The "action" parameter is invalid.'
' Invalid Paremeters for Deleting Airflow'
' Connection')
}
mock_config.return_value = 'some_url'
result = self.simulate_get(self._url, headers=self._headers)
assert result.json == doc
assert result.status == falcon.HTTP_400
mock_config.assert_called_once_with('base', 'web_server')
@mock.patch('shipyard_airflow.airflow_client.requests')
@mock.patch.object(AirflowDeleteConnectionResource, 'retrieve_config')
def test_on_get_airflow_error(self, mock_config, mock_requests):
doc = {
'message': 'Error response from Airflow',
'description': "can't delete connections in airflow"
}
mock_response = {
'output': {
'stderr': "can't delete connections in airflow"
}
}
mock_requests.get.return_value.json.return_value = mock_response
mock_config.return_value = 'some_url'
result = self.simulate_get(self._url, headers=self._headers)
assert result.json == doc
assert result.status == falcon.HTTP_400
mock_config.assert_called_once_with('base', 'web_server')
@mock.patch('shipyard_airflow.airflow_client.requests')
@mock.patch.object(AirflowDeleteConnectionResource, 'retrieve_config')
def test_on_get_airflow_success(self, mock_config, mock_requests):
doc = {
'type': 'success',
'message': 'Airflow Success',
}
mock_response = {
'output': {
'stderr': None,
'stdout': 'Airflow Success'
}
}
mock_requests.get.return_value.json.return_value = mock_response
mock_config.return_value = 'some_url'
result = self.simulate_get(self._url, headers=self._headers)
assert result.json == doc
assert result.status == falcon.HTTP_200
mock_config.assert_called_once_with('base', 'web_server')
class AirflowListConnectionsResource(BaseTesting):
def setUp(self):
self.action = 'list'
super().setUp()
@property
def _url(self):
return '/api/v1.0/connections/{}'.format(self.action)
def test_on_get_missing_config_file(self):
doc = {
'description': 'Missing Configuration File',
'message': 'Internal Server Error'
}
result = self.simulate_get(self._url, headers=self._headers)
assert result.json == doc
assert result.status == falcon.HTTP_500
@mock.patch.object(AirflowListConnectionsResource, 'retrieve_config')
def test_on_get_invalid_action(self, mock_config):
self.action = 'invalid_action'
doc = {
'title': 'Invalid parameter',
'description': ('The "action" parameter is invalid.'
' Invalid Paremeters for listing Airflow'
' Connections')
}
mock_config.return_value = 'some_url'
result = self.simulate_get(self._url, headers=self._headers)
assert result.json == doc
assert result.status == falcon.HTTP_400
mock_config.assert_called_once_with('base', 'web_server')
@mock.patch('shipyard_airflow.airflow_client.requests')
@mock.patch.object(AirflowListConnectionsResource, 'retrieve_config')
def test_on_get_airflow_error(self, mock_config, mock_requests):
doc = {
'message': 'Error response from Airflow',
'description': "can't list connections in airlfow"
}
mock_response = {
'output': {
'stderr': "can't list connections in airlfow"
}
}
mock_requests.get.return_value.json.return_value = mock_response
mock_config.return_value = 'some_url'
result = self.simulate_get(self._url, headers=self._headers)
assert result.json == doc
assert result.status == falcon.HTTP_400
mock_config.assert_called_once_with('base', 'web_server')
@mock.patch('shipyard_airflow.airflow_client.requests')
@mock.patch.object(AirflowListConnectionsResource, 'retrieve_config')
def test_on_get_airflow_success(self, mock_config, mock_requests):
doc = {
'type': 'success',
'message': 'Airflow Success',
}
mock_response = {
'output': {
'stderr': None,
'stdout': 'Airflow Success'
}
}
mock_requests.get.return_value.json.return_value = mock_response
mock_config.return_value = 'some_url'
result = self.simulate_get(self._url, headers=self._headers)
assert result.json == doc
assert result.status == falcon.HTTP_200
mock_config.assert_called_once_with('base', 'web_server')