Add CM API lib into CDH plugin codes

We open a directory named "client" and add codes in it to support
API callings from CDH plugin codes, so that we can remove the
dependency on third party package cm_api. Most codes in client are
copied from cm_api source codes. We just changed some import
relationships, removed unused codes, and regulated a little to
make it pass pep8 test.

implements bp: add-lib-subset-cm-api

Change-Id: I73a66520f27271d628d1997c84c62140acb4aa54
This commit is contained in:
Ken Chen 2015-02-05 10:52:41 +08:00
parent 6647137eee
commit e5ea6f4483
21 changed files with 2291 additions and 63 deletions

View File

View File

@ -0,0 +1,119 @@
# Copyright (c) 2014 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# The contents of this file are mainly copied from cm_api sources,
# released by Cloudrea. Codes not used by Sahara CDH plugin are removed.
# You can find the original codes at
#
# https://github.com/cloudera/cm_api/tree/master/python/src/cm_api
#
# To satisfy the pep8 and python3 tests, we did some changes to the codes.
# We also change some importings to use Sahara inherited classes.
from sahara.plugins.cdh.client import clusters
from sahara.plugins.cdh.client import cms
from sahara.plugins.cdh.client import hosts
from sahara.plugins.cdh.client import http_client
from sahara.plugins.cdh.client import resource
API_AUTH_REALM = "Cloudera Manager"
API_CURRENT_VERSION = 8
class ApiResource(resource.Resource):
"""Top-level API Resource
Resource object that provides methods for managing the top-level API
resources.
"""
def __init__(self, server_host, server_port=None,
username="admin", password="admin",
use_tls=False, version=API_CURRENT_VERSION):
"""Creates a Resource object that provides API endpoints.
:param server_host: The hostname of the Cloudera Manager server.
:param server_port: The port of the server. Defaults to 7180 (http) or
7183 (https).
:param username: Login name.
:param password: Login password.
:param use_tls: Whether to use tls (https).
:param version: API version.
:return: Resource object referring to the root.
"""
self._version = version
protocol = "https" if use_tls else "http"
if server_port is None:
server_port = 7183 if use_tls else 7180
base_url = ("%s://%s:%s/api/v%s"
% (protocol, server_host, server_port, version))
client = http_client.HttpClient(base_url)
client.set_basic_auth(username, password, API_AUTH_REALM)
client.set_headers({"Content-Type": "application/json"})
resource.Resource.__init__(self, client)
@property
def version(self):
"""Returns the API version (integer) being used."""
return self._version
def get_cloudera_manager(self):
"""Returns a Cloudera Manager object."""
return cms.ClouderaManager(self)
def create_cluster(self, name, version=None, fullVersion=None):
"""Create a new cluster
:param name: Cluster name.
:param version: Cluster major CDH version, e.g. 'CDH5'. Ignored if
fullVersion is specified.
:param fullVersion: Complete CDH version, e.g. '5.1.2'. Overrides major
version if both specified.
:return: The created cluster.
"""
return clusters.create_cluster(self, name, version, fullVersion)
def get_all_clusters(self, view=None):
"""Retrieve a list of all clusters
:param view: View to materialize ('full' or 'summary').
:return: A list of ApiCluster objects.
"""
return clusters.get_all_clusters(self, view)
def get_cluster(self, name):
"""Look up a cluster by name
:param name: Cluster name.
:return: An ApiCluster object.
"""
return clusters.get_cluster(self, name)
def delete_host(self, host_id):
"""Delete a host by id
:param host_id: Host id
:return: The deleted ApiHost object
"""
return hosts.delete_host(self, host_id)
def get_all_hosts(self, view=None):
"""Get all hosts
:param view: View to materialize ('full' or 'summary').
:return: A list of ApiHost objects.
"""
return hosts.get_all_hosts(self, view)

View File

@ -0,0 +1,166 @@
# Copyright (c) 2014 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# The contents of this file are mainly copied from cm_api sources,
# released by Cloudrea. Codes not used by Sahara CDH plugin are removed.
# You can find the original codes at
#
# https://github.com/cloudera/cm_api/tree/master/python/src/cm_api
#
# To satisfy the pep8 and python3 tests, we did some changes to the codes.
# We also change some importings to use Sahara inherited classes.
from sahara.i18n import _
from sahara.plugins.cdh.client import services
from sahara.plugins.cdh.client import types
from sahara.plugins.cdh import exceptions as ex
CLUSTERS_PATH = "/clusters"
def create_cluster(resource_root, name, version=None, fullVersion=None):
"""Create a cluster
:param resource_root: The root Resource object.
:param name: Cluster name
:param version: Cluster CDH major version (eg: "CDH4")
- The CDH minor version will be assumed to be the
latest released version for CDH4, or 5.0 for CDH5.
:param fullVersion: Cluster's full CDH version. (eg: "5.1.1")
- If specified, 'version' will be ignored.
- Since: v6
:return: An ApiCluster object
"""
if version is None and fullVersion is None:
raise ex.CMApiVersionError(
_("Either 'version' or 'fullVersion' must be specified"))
if fullVersion is not None:
api_version = 6
version = None
else:
api_version = 1
apicluster = ApiCluster(resource_root, name, version, fullVersion)
return types.call(resource_root.post, CLUSTERS_PATH, ApiCluster, True,
data=[apicluster], api_version=api_version)[0]
def get_cluster(resource_root, name):
"""Lookup a cluster by name
:param resource_root: The root Resource object.
:param name: Cluster name
:return: An ApiCluster object
"""
return types.call(resource_root.get, "%s/%s"
% (CLUSTERS_PATH, name), ApiCluster)
def get_all_clusters(resource_root, view=None):
"""Get all clusters
:param resource_root: The root Resource object.
:return: A list of ApiCluster objects.
"""
return types.call(resource_root.get, CLUSTERS_PATH, ApiCluster, True,
params=(dict(view=view) if view else None))
class ApiCluster(types.BaseApiResource):
_ATTRIBUTES = {
'name': None,
'displayName': None,
'version': None,
'fullVersion': None,
'maintenanceMode': types.ROAttr(),
'maintenanceOwners': types.ROAttr(),
}
def __init__(self, resource_root, name=None, version=None,
fullVersion=None):
types.BaseApiObject.init(self, resource_root, locals())
def _path(self):
return "%s/%s" % (CLUSTERS_PATH, self.name)
def get_service_types(self):
"""Get all service types supported by this cluster
:return: A list of service types (strings)
"""
resp = self._get_resource_root().get(self._path() + '/serviceTypes')
return resp[types.ApiList.LIST_KEY]
def get_commands(self, view=None):
"""Retrieve a list of running commands for this cluster
:param view: View to materialize ('full' or 'summary')
:return: A list of running commands.
"""
return self._get("commands", types.ApiCommand, True,
params=(dict(view=view) if view else None))
def create_service(self, name, service_type):
"""Create a service
:param name: Service name
:param service_type: Service type
:return: An ApiService object
"""
return services.create_service(self._get_resource_root(), name,
service_type, self.name)
def get_service(self, name):
"""Lookup a service by name
:param name: Service name
:return: An ApiService object
"""
return services.get_service(self._get_resource_root(),
name, self.name)
def start(self):
"""Start all services in a cluster, respecting dependencies
:return: Reference to the submitted command.
"""
return self._cmd('start')
def deploy_client_config(self):
"""Deploys Service client configuration to the hosts on the cluster
:return: Reference to the submitted command.
:since: API v2
"""
return self._cmd('deployClientConfig')
def first_run(self):
"""Prepare and start services in a cluster
Perform all the steps needed to prepare each service in a
cluster and start the services in order.
:return: Reference to the submitted command.
:since: API v7
"""
return self._cmd('firstRun', None, api_version=7)
def remove_host(self, hostId):
"""Removes the association of the host with the cluster
:return: A ApiHostRef of the host that was removed.
:since: API v3
"""
return self._delete("hosts/" + hostId, types.ApiHostRef, api_version=3)

View File

@ -0,0 +1,62 @@
# Copyright (c) 2014 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# The contents of this file are mainly copied from cm_api sources,
# released by Cloudrea. Codes not used by Sahara CDH plugin are removed.
# You can find the original codes at
#
# https://github.com/cloudera/cm_api/tree/master/python/src/cm_api
#
# To satisfy the pep8 and python3 tests, we did some changes to the codes.
# We also change some importings to use Sahara inherited classes.
from sahara.plugins.cdh.client.services import ApiService
from sahara.plugins.cdh.client import types
class ClouderaManager(types.BaseApiResource):
"""The Cloudera Manager instance
Provides access to CM configuration and services.
"""
def __init__(self, resource_root):
types.BaseApiObject.init(self, resource_root)
def _path(self):
return '/cm'
def create_mgmt_service(self, service_setup_info):
"""Setup the Cloudera Management Service
:param service_setup_info: ApiServiceSetupInfo object.
:return: The management service instance.
"""
return self._put("service", ApiService, data=service_setup_info)
def get_service(self):
"""Return the Cloudera Management Services instance
:return: An ApiService instance.
"""
return self._get("service", ApiService)
def hosts_start_roles(self, host_names):
"""Start all the roles on the specified hosts
:param host_names: List of names of hosts on which to start all roles.
:return: Information about the submitted command.
:since: API v2
"""
return self._cmd('hostsStartRoles', data=host_names)

View File

@ -0,0 +1,87 @@
# Copyright (c) 2014 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# The contents of this file are mainly copied from cm_api sources,
# released by Cloudrea. Codes not used by Sahara CDH plugin are removed.
# You can find the original codes at
#
# https://github.com/cloudera/cm_api/tree/master/python/src/cm_api
#
# To satisfy the pep8 and python3 tests, we did some changes to the codes.
# We also change some importings to use Sahara inherited classes.
import datetime
from sahara.plugins.cdh.client import types
HOSTS_PATH = "/hosts"
def get_all_hosts(resource_root, view=None):
"""Get all hosts
:param resource_root: The root Resource object.
:return: A list of ApiHost objects.
"""
return types.call(resource_root.get, HOSTS_PATH, ApiHost, True,
params=(dict(view=view) if view else None))
def delete_host(resource_root, host_id):
"""Delete a host by id
:param resource_root: The root Resource object.
:param host_id: Host id
:return: The deleted ApiHost object
"""
return types.call(resource_root.delete, "%s/%s"
% (HOSTS_PATH, host_id), ApiHost)
class ApiHost(types.BaseApiResource):
_ATTRIBUTES = {
'hostId': None,
'hostname': None,
'ipAddress': None,
'rackId': None,
'status': types.ROAttr(),
'lastHeartbeat': types.ROAttr(datetime.datetime),
'roleRefs': types.ROAttr(types.ApiRoleRef),
'healthSummary': types.ROAttr(),
'healthChecks': types.ROAttr(),
'hostUrl': types.ROAttr(),
'commissionState': types.ROAttr(),
'maintenanceMode': types.ROAttr(),
'maintenanceOwners': types.ROAttr(),
'numCores': types.ROAttr(),
'totalPhysMemBytes': types.ROAttr(),
}
def __init__(self, resource_root, hostId=None, hostname=None,
ipAddress=None, rackId=None):
types.BaseApiObject.init(self, resource_root, locals())
def __str__(self):
return "<ApiHost>: %s (%s)" % (self.hostId, self.ipAddress)
def _path(self):
return HOSTS_PATH + '/' + self.hostId
def _put_host(self):
"""Update this resource
:return: The updated object.
"""
return self._put('', ApiHost, data=self)

View File

@ -0,0 +1,209 @@
# Copyright (c) 2014 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# The contents of this file are mainly copied from cm_api sources,
# released by Cloudrea. Codes not used by Sahara CDH plugin are removed.
# You can find the original codes at
#
# https://github.com/cloudera/cm_api/tree/master/python/src/cm_api
#
# To satisfy the pep8 and python3 tests, we did some changes to the codes.
# We also change some importings to use Sahara inherited classes.
import cookielib
import json
import posixpath
import types
import urllib
import urllib2
from oslo_log import log as logging
import six
from sahara.i18n import _LW
from sahara.plugins.cdh import exceptions as ex
LOG = logging.getLogger(__name__)
class HttpClient(object):
"""Basic HTTP client tailored for rest APIs."""
def __init__(self, base_url, exc_class=ex.CMApiException):
"""Init Method
:param base_url: The base url to the API.
:param exc_class: An exception class to handle non-200 results.
Creates an HTTP(S) client to connect to the Cloudera Manager API.
"""
self._base_url = base_url.rstrip('/')
self._exc_class = exc_class
self._headers = {}
# Make a basic auth handler that does nothing. Set credentials later.
self._passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
authhandler = urllib2.HTTPBasicAuthHandler(self._passmgr)
# Make a cookie processor
cookiejar = cookielib.CookieJar()
self._opener = urllib2.build_opener(
urllib2.HTTPErrorProcessor(),
urllib2.HTTPCookieProcessor(cookiejar),
authhandler)
def set_basic_auth(self, username, password, realm):
"""Set up basic auth for the client
:param username: Login name.
:param password: Login password.
:param realm: The authentication realm.
:return: The current object
"""
self._passmgr.add_password(realm, self._base_url, username, password)
return self
def set_headers(self, headers):
"""Add headers to the request
:param headers: A dictionary with the key value pairs for the headers
:return: The current object
"""
self._headers = headers
return self
@property
def base_url(self):
return self._base_url
def _get_headers(self, headers):
res = self._headers.copy()
if headers:
res.update(headers)
return res
def execute(self, http_method, path, params=None, data=None, headers=None):
"""Submit an HTTP request
:param http_method: GET, POST, PUT, DELETE
:param path: The path of the resource.
:param params: Key-value parameter data.
:param data: The data to attach to the body of the request.
:param headers: The headers to set for this request.
:return: The result of urllib2.urlopen()
"""
# Prepare URL and params
url = self._make_url(path, params)
if http_method in ("GET", "DELETE"):
if data is not None:
LOG.warn(_LW("%(method)s method does not pass any data."
" Path '%(path)s'"),
{'method': http_method, 'path': path})
data = None
# Setup the request
request = urllib2.Request(url, data)
# Hack/workaround because urllib2 only does GET and POST
request.get_method = lambda: http_method
headers = self._get_headers(headers)
for k, v in headers.items():
request.add_header(k, v)
# Call it
LOG.debug("Method: %s, URL: %s" % (http_method, url))
try:
return self._opener.open(request)
except urllib2.HTTPError as ex:
message = six.text_type(ex)
try:
json_body = json.loads(message)
message = json_body['message']
except (ValueError, KeyError):
pass # Ignore json parsing error
raise self._exc_class(message)
def _make_url(self, path, params):
res = self._base_url
if path:
res += posixpath.normpath('/' + path.lstrip('/'))
if params:
param_str = urllib.urlencode(params, True)
res += '?' + param_str
return iri_to_uri(res)
#
# Method copied from Django
#
def iri_to_uri(iri):
"""Convert IRI to URI
Convert an Internationalized Resource Identifier (IRI) portion to a URI
portion that is suitable for inclusion in a URL.
This is the algorithm from section 3.1 of RFC 3987. However, since we are
assuming input is either UTF-8 or unicode already, we can simplify things a
little from the full method.
Returns an ASCII string containing the encoded result.
"""
# The list of safe characters here is constructed from the "reserved" and
# "unreserved" characters specified in sections 2.2 and 2.3 of RFC 3986:
# reserved = gen-delims / sub-delims
# gen-delims = ":" / "/" / "?" / "#" / "[" / "]" / "@"
# sub-delims = "!" / "$" / "&" / "'" / "(" / ")"
# / "*" / "+" / "," / ";" / "="
# unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
# Of the unreserved characters, urllib.quote already considers all but
# the ~ safe.
# The % character is also added to the list of safe characters here, as the
# end of section 3.1 of RFC 3987 specifically mentions that % must not be
# converted.
if iri is None:
return iri
return urllib.quote(smart_str(iri), safe="/#%[]=:;$&()+,!?*@'~")
#
# Method copied from Django
#
def smart_str(s, encoding='utf-8', strings_only=False, errors='strict'):
"""Convert string into bytestring version
Returns a bytestring version of 's', encoded as specified in 'encoding'.
If strings_only is True, don't convert (some) non-string-like objects.
"""
if strings_only and isinstance(s, (types.NoneType, int)):
return s
elif not isinstance(s, basestring):
try:
return six.text_type(s)
except UnicodeEncodeError:
if isinstance(s, Exception):
# An Exception subclass containing non-ASCII data that doesn't
# know how to print itself properly. We shouldn't raise a
# further exception.
return ' '.join([smart_str(arg, encoding, strings_only,
errors) for arg in s])
return unicode(s).encode(encoding, errors)
elif isinstance(s, unicode):
return s.encode(encoding, errors)
elif s and encoding != 'utf-8':
return s.decode('utf-8', errors).encode(encoding, errors)
else:
return s

View File

@ -0,0 +1,168 @@
# Copyright (c) 2014 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# The contents of this file are mainly copied from cm_api sources,
# released by Cloudrea. Codes not used by Sahara CDH plugin are removed.
# You can find the original codes at
#
# https://github.com/cloudera/cm_api/tree/master/python/src/cm_api
#
# To satisfy the pep8 and python3 tests, we did some changes to the codes.
# We also change some importings to use Sahara inherited classes.
import json
import posixpath
import socket
import urllib2
from oslo_log import log as logging
import six
from sahara import context
from sahara.i18n import _
from sahara.i18n import _LE
from sahara.i18n import _LW
from sahara.plugins.cdh import exceptions as ex
LOG = logging.getLogger(__name__)
class Resource(object):
"""Base Resource
Encapsulates a resource, and provides actions to invoke on it.
"""
def __init__(self, client, relpath=""):
"""Constructor method
:param client: A Client object.
:param relpath: The relative path of the resource.
"""
self._client = client
self._path = relpath.strip('/')
self.retries = 3
self.retry_sleep = 3
@property
def base_url(self):
return self._client.base_url
def _join_uri(self, relpath):
if relpath is None:
return self._path
return self._path + posixpath.normpath('/' + relpath)
def invoke(self, method, relpath=None, params=None, data=None,
headers=None):
"""Invoke an API method
:return: Raw body or JSON dictionary (if response content type is
JSON).
"""
path = self._join_uri(relpath)
resp = self._client.execute(method,
path,
params=params,
data=data,
headers=headers)
try:
body = resp.read()
except Exception as ex:
raise ex.CMApiException(
_("Command %(method)s %(path)s failed: %(msg)s")
% {'method': method, 'path': path, 'msg': six.text_type(ex)})
LOG.debug("%s Got response: %s%s"
% (method, body[:32], "..." if len(body) > 32 else ""))
# Is the response application/json?
if (len(body) != 0 and resp.info().getmaintype() == "application"
and resp.info().getsubtype() == "json"):
try:
json_dict = json.loads(body)
return json_dict
except Exception as ex:
LOG.exception(_LE('JSON decode error: %s'), body)
raise ex
else:
return body
def get(self, relpath=None, params=None):
"""Invoke the GET method on a resource
:param relpath: Optional. A relative path to this resource's path.
:param params: Key-value data.
:return: A dictionary of the JSON result.
"""
for retry in six.moves.xrange(self.retries + 1):
if retry:
context.sleep(self.retry_sleep)
try:
return self.invoke("GET", relpath, params)
except (socket.error, urllib2.URLError) as e:
if "timed out" in six.text_type(e).lower():
LOG.warn(
_LW("Timeout issuing GET request for"
" %(path)s. %(post_msg)s")
% {'path': self._join_uri(relpath),
'post_msg':
_LW("Will retry") if retry < self.retries
else _LW("No retries left.")})
else:
raise e
else:
raise ex.CMApiException(_("Get retry max time reached."))
def delete(self, relpath=None, params=None):
"""Invoke the DELETE method on a resource
:param relpath: Optional. A relative path to this resource's path.
:param params: Key-value data.
:return: A dictionary of the JSON result.
"""
return self.invoke("DELETE", relpath, params)
def post(self, relpath=None, params=None, data=None, contenttype=None):
"""Invoke the POST method on a resource
:param relpath: Optional. A relative path to this resource's path.
:param params: Key-value data.
:param data: Optional. Body of the request.
:param contenttype: Optional.
:return: A dictionary of the JSON result.
"""
return self.invoke("POST", relpath, params, data,
self._make_headers(contenttype))
def put(self, relpath=None, params=None, data=None, contenttype=None):
"""Invoke the PUT method on a resource
:param relpath: Optional. A relative path to this resource's path.
:param params: Key-value data.
:param data: Optional. Body of the request.
:param contenttype: Optional.
:return: A dictionary of the JSON result.
"""
return self.invoke("PUT", relpath, params, data,
self._make_headers(contenttype))
def _make_headers(self, contenttype=None):
if contenttype:
return {'Content-Type': contenttype}
return None

View File

@ -0,0 +1,108 @@
# Copyright (c) 2014 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# The contents of this file are mainly copied from cm_api sources,
# released by Cloudrea. Codes not used by Sahara CDH plugin are removed.
# You can find the original codes at
#
# https://github.com/cloudera/cm_api/tree/master/python/src/cm_api
#
# To satisfy the pep8 and python3 tests, we did some changes to the codes.
# We also change some importings to use Sahara inherited classes.
from sahara.plugins.cdh.client import types
ROLE_CONFIG_GROUPS_PATH = "/clusters/%s/services/%s/roleConfigGroups"
CM_ROLE_CONFIG_GROUPS_PATH = "/cm/service/roleConfigGroups"
def _get_role_config_groups_path(cluster_name, service_name):
if cluster_name:
return ROLE_CONFIG_GROUPS_PATH % (cluster_name, service_name)
else:
return CM_ROLE_CONFIG_GROUPS_PATH
def _get_role_config_group_path(cluster_name, service_name, name):
path = _get_role_config_groups_path(cluster_name, service_name)
return "%s/%s" % (path, name)
def get_all_role_config_groups(resource_root, service_name,
cluster_name="default"):
"""Get all role config groups in the specified service
:param resource_root: The root Resource object.
:param service_name: Service name.
:param cluster_name: Cluster name.
:return: A list of ApiRoleConfigGroup objects.
:since: API v3
"""
return types.call(resource_root.get,
_get_role_config_groups_path(cluster_name, service_name),
ApiRoleConfigGroup, True, api_version=3)
class ApiRoleConfigGroup(types.BaseApiResource):
_ATTRIBUTES = {
'name': None,
'displayName': None,
'roleType': None,
'config': types.Attr(types.ApiConfig),
'base': types.ROAttr(),
'serviceRef': types.ROAttr(types.ApiServiceRef),
}
def __init__(self, resource_root, name=None, displayName=None,
roleType=None, config=None):
types.BaseApiObject.init(self, resource_root, locals())
def __str__(self):
return ("<ApiRoleConfigGroup>: %s (cluster: %s; service: %s)"
% (self.name, self.serviceRef.clusterName,
self.serviceRef.serviceName))
def _api_version(self):
return 3
def _path(self):
return _get_role_config_group_path(self.serviceRef.clusterName,
self.serviceRef.serviceName,
self.name)
def get_config(self, view=None):
"""Retrieve the group's configuration
The 'summary' view contains strings as the dictionary values. The full
view contains types.ApiConfig instances as the values.
:param view: View to materialize ('full' or 'summary').
:return: Dictionary with configuration data.
"""
path = self._path() + '/config'
resp = self._get_resource_root().get(
path, params=(dict(view=view) if view else None))
return types.json_to_config(resp, view == 'full')
def update_config(self, config):
"""Update the group's configuration
:param config: Dictionary with configuration to update.
:return: Dictionary with updated configuration.
"""
path = self._path() + '/config'
resp = self._get_resource_root().put(
path, data=types.config_to_json(config))
return types.json_to_config(resp)

View File

@ -0,0 +1,187 @@
# Copyright (c) 2014 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# The contents of this file are mainly copied from cm_api sources,
# released by Cloudrea. Codes not used by Sahara CDH plugin are removed.
# You can find the original codes at
#
# https://github.com/cloudera/cm_api/tree/master/python/src/cm_api
#
# To satisfy the pep8 and python3 tests, we did some changes to the codes.
# We also change some importings to use Sahara inherited classes.
from sahara.plugins.cdh.client import types
ROLES_PATH = "/clusters/%s/services/%s/roles"
CM_ROLES_PATH = "/cm/service/roles"
def _get_roles_path(cluster_name, service_name):
if cluster_name:
return ROLES_PATH % (cluster_name, service_name)
else:
return CM_ROLES_PATH
def _get_role_path(cluster_name, service_name, role_name):
path = _get_roles_path(cluster_name, service_name)
return "%s/%s" % (path, role_name)
def create_role(resource_root,
service_name,
role_type,
role_name,
host_id,
cluster_name="default"):
"""Create a role
:param resource_root: The root Resource object.
:param service_name: Service name
:param role_type: Role type
:param role_name: Role name
:param cluster_name: Cluster name
:return: An ApiRole object
"""
apirole = ApiRole(resource_root, role_name, role_type,
types.ApiHostRef(resource_root, host_id))
return types.call(resource_root.post,
_get_roles_path(cluster_name, service_name),
ApiRole, True, data=[apirole])[0]
def get_role(resource_root, service_name, name, cluster_name="default"):
"""Lookup a role by name
:param resource_root: The root Resource object.
:param service_name: Service name
:param name: Role name
:param cluster_name: Cluster name
:return: An ApiRole object
"""
return _get_role(resource_root, _get_role_path(cluster_name,
service_name, name))
def _get_role(resource_root, path):
return types.call(resource_root.get, path, ApiRole)
def get_all_roles(resource_root, service_name, cluster_name="default",
view=None):
"""Get all roles
:param resource_root: The root Resource object.
:param service_name: Service name
:param cluster_name: Cluster name
:return: A list of ApiRole objects.
"""
return types.call(resource_root.get,
_get_roles_path(cluster_name, service_name), ApiRole,
True, params=(dict(view=view) if view else None))
def get_roles_by_type(resource_root, service_name, role_type,
cluster_name="default", view=None):
"""Get all roles of a certain type in a service
:param resource_root: The root Resource object.
:param service_name: Service name
:param role_type: Role type
:param cluster_name: Cluster name
:return: A list of ApiRole objects.
"""
roles = get_all_roles(resource_root, service_name, cluster_name, view)
return [r for r in roles if r.type == role_type]
def delete_role(resource_root, service_name, name, cluster_name="default"):
"""Delete a role by name
:param resource_root: The root Resource object.
:param service_name: Service name
:param name: Role name
:param cluster_name: Cluster name
:return: The deleted ApiRole object
"""
return types.call(resource_root.delete,
_get_role_path(cluster_name, service_name, name),
ApiRole)
class ApiRole(types.BaseApiResource):
_ATTRIBUTES = {
'name': None,
'type': None,
'hostRef': types.Attr(types.ApiHostRef),
'roleState': types.ROAttr(),
'healthSummary': types.ROAttr(),
'healthChecks': types.ROAttr(),
'serviceRef': types.ROAttr(types.ApiServiceRef),
'configStale': types.ROAttr(),
'configStalenessStatus': types.ROAttr(),
'haStatus': types.ROAttr(),
'roleUrl': types.ROAttr(),
'commissionState': types.ROAttr(),
'maintenanceMode': types.ROAttr(),
'maintenanceOwners': types.ROAttr(),
'roleConfigGroupRef': types.ROAttr(types.ApiRoleConfigGroupRef),
'zooKeeperServerMode': types.ROAttr(),
}
def __init__(self, resource_root, name=None, type=None, hostRef=None):
types.BaseApiObject.init(self, resource_root, locals())
def __str__(self):
return ("<ApiRole>: %s (cluster: %s; service: %s)"
% (self.name, self.serviceRef.clusterName,
self.serviceRef.serviceName))
def _path(self):
return _get_role_path(self.serviceRef.clusterName,
self.serviceRef.serviceName,
self.name)
def _get_log(self, log):
path = "%s/logs/%s" % (self._path(), log)
return self._get_resource_root().get(path)
def get_commands(self, view=None):
"""Retrieve a list of running commands for this role
:param view: View to materialize ('full' or 'summary')
:return: A list of running commands.
"""
return self._get("commands", types.ApiCommand, True,
params=(dict(view=view) if view else None))
def get_config(self, view=None):
"""Retrieve the role's configuration
The 'summary' view contains strings as the dictionary values. The full
view contains types.ApiConfig instances as the values.
:param view: View to materialize ('full' or 'summary')
:return: Dictionary with configuration data.
"""
return self._get_config("config", view)
def update_config(self, config):
"""Update the role's configuration
:param config: Dictionary with configuration to update.
:return: Dictionary with updated configuration.
"""
return self._update_config("config", config)

View File

@ -0,0 +1,423 @@
# Copyright (c) 2014 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# The contents of this file are mainly copied from cm_api sources,
# released by Cloudrea. Codes not used by Sahara CDH plugin are removed.
# You can find the original codes at
#
# https://github.com/cloudera/cm_api/tree/master/python/src/cm_api
#
# To satisfy the pep8 and python3 tests, we did some changes to the codes.
# We also change some importings to use Sahara inherited classes.
import json
import six
from sahara.plugins.cdh.client import role_config_groups
from sahara.plugins.cdh.client import roles
from sahara.plugins.cdh.client import types
SERVICES_PATH = "/clusters/%s/services"
SERVICE_PATH = "/clusters/%s/services/%s"
ROLETYPES_CFG_KEY = 'roleTypeConfigs'
def create_service(resource_root, name, service_type,
cluster_name="default"):
"""Create a service
:param resource_root: The root Resource object.
:param name: Service name
:param service_type: Service type
:param cluster_name: Cluster name
:return: An ApiService object
"""
apiservice = ApiService(resource_root, name, service_type)
return types.call(resource_root.post, SERVICES_PATH % (cluster_name,),
ApiService, True, data=[apiservice])[0]
def get_service(resource_root, name, cluster_name="default"):
"""Lookup a service by name
:param resource_root: The root Resource object.
:param name: Service name
:param cluster_name: Cluster name
:return: An ApiService object
"""
return _get_service(resource_root, "%s/%s"
% (SERVICES_PATH % (cluster_name,), name))
def _get_service(resource_root, path):
return types.call(resource_root.get, path, ApiService)
def get_all_services(resource_root, cluster_name="default", view=None):
"""Get all services
:param resource_root: The root Resource object.
:param cluster_name: Cluster name
:return: A list of ApiService objects.
"""
return types.call(resource_root.get, SERVICES_PATH % (cluster_name,),
ApiService, True,
params=(dict(view=view) if view else None))
def delete_service(resource_root, name, cluster_name="default"):
"""Delete a service by name
:param resource_root: The root Resource object.
:param name: Service name
:param cluster_name: Cluster name
:return: The deleted ApiService object
"""
return types.call(resource_root.delete,
"%s/%s" % (SERVICES_PATH % (cluster_name,), name),
ApiService)
class ApiService(types.BaseApiResource):
_ATTRIBUTES = {
'name': None,
'type': None,
'displayName': None,
'serviceState': types.ROAttr(),
'healthSummary': types.ROAttr(),
'healthChecks': types.ROAttr(),
'clusterRef': types.ROAttr(types.ApiClusterRef),
'configStale': types.ROAttr(),
'configStalenessStatus': types.ROAttr(),
'clientConfigStalenessStatus': types.ROAttr(),
'serviceUrl': types.ROAttr(),
'maintenanceMode': types.ROAttr(),
'maintenanceOwners': types.ROAttr(),
}
def __init__(self, resource_root, name=None, type=None):
types.BaseApiObject.init(self, resource_root, locals())
def __str__(self):
return ("<ApiService>: %s (cluster: %s)"
% (self.name, self._get_cluster_name()))
def _get_cluster_name(self):
if hasattr(self, 'clusterRef') and self.clusterRef:
return self.clusterRef.clusterName
return None
def _path(self):
"""Return the API path for this service
This method assumes that lack of a cluster reference means that the
object refers to the Cloudera Management Services instance.
"""
if self._get_cluster_name():
return SERVICE_PATH % (self._get_cluster_name(), self.name)
else:
return '/cm/service'
def _role_cmd(self, cmd, roles, api_version=1):
return self._post("roleCommands/" + cmd, types.ApiBulkCommandList,
data=roles, api_version=api_version)
def _parse_svc_config(self, json_dic, view=None):
"""Parse a json-decoded ApiServiceConfig dictionary into a 2-tuple
:param json_dic: The json dictionary with the config data.
:param view: View to materialize.
:return: 2-tuple (service config dictionary, role type configurations)
"""
svc_config = types.json_to_config(json_dic, view == 'full')
rt_configs = {}
if ROLETYPES_CFG_KEY in json_dic:
for rt_config in json_dic[ROLETYPES_CFG_KEY]:
rt_configs[rt_config['roleType']] = types.json_to_config(
rt_config, view == 'full')
return (svc_config, rt_configs)
def create_yarn_job_history_dir(self):
"""Create the Yarn job history directory
:return: Reference to submitted command.
:since: API v6
"""
return self._cmd('yarnCreateJobHistoryDirCommand', api_version=6)
def get_config(self, view=None):
"""Retrieve the service's configuration
Retrieves both the service configuration and role type configuration
for each of the service's supported role types. The role type
configurations are returned as a dictionary, whose keys are the
role type name, and values are the respective configuration
dictionaries.
The 'summary' view contains strings as the dictionary values. The full
view contains types.ApiConfig instances as the values.
:param view: View to materialize ('full' or 'summary')
:return: 2-tuple (service config dictionary, role type configurations)
"""
path = self._path() + '/config'
resp = self._get_resource_root().get(
path, params=(dict(view=view) if view else None))
return self._parse_svc_config(resp, view)
def update_config(self, svc_config, **rt_configs):
"""Update the service's configuration
:param svc_config: Dictionary with service configuration to update.
:param rt_configs: Dict of role type configurations to update.
:return: 2-tuple (service config dictionary, role type configurations)
"""
path = self._path() + '/config'
if svc_config:
data = types.config_to_api_list(svc_config)
else:
data = {}
if rt_configs:
rt_list = []
for rt, cfg in six.iteritems(rt_configs):
rt_data = types.config_to_api_list(cfg)
rt_data['roleType'] = rt
rt_list.append(rt_data)
data[ROLETYPES_CFG_KEY] = rt_list
resp = self._get_resource_root().put(path, data=json.dumps(data))
return self._parse_svc_config(resp)
def create_role(self, role_name, role_type, host_id):
"""Create a role
:param role_name: Role name
:param role_type: Role type
:param host_id: ID of the host to assign the role to
:return: An ApiRole object
"""
return roles.create_role(self._get_resource_root(), self.name,
role_type, role_name, host_id,
self._get_cluster_name())
def delete_role(self, name):
"""Delete a role by name
:param name: Role name
:return: The deleted ApiRole object
"""
return roles.delete_role(self._get_resource_root(), self.name, name,
self._get_cluster_name())
def get_roles_by_type(self, role_type, view=None):
"""Get all roles of a certain type in a service
:param role_type: Role type
:param view: View to materialize ('full' or 'summary')
:return: A list of ApiRole objects.
"""
return roles.get_roles_by_type(self._get_resource_root(), self.name,
role_type, self._get_cluster_name(),
view)
def get_all_role_config_groups(self):
"""Get a list of role configuration groups in the service
:return: A list of ApiRoleConfigGroup objects.
:since: API v3
"""
return role_config_groups.get_all_role_config_groups(
self._get_resource_root(), self.name, self._get_cluster_name())
def start(self):
"""Start a service
:return: Reference to the submitted command.
"""
return self._cmd('start')
def stop(self):
"""Stop a service
:return: Reference to the submitted command.
"""
return self._cmd('stop')
def restart(self):
"""Restart a service
:return: Reference to the submitted command.
"""
return self._cmd('restart')
def start_roles(self, *role_names):
"""Start a list of roles
:param role_names: names of the roles to start.
:return: List of submitted commands.
"""
return self._role_cmd('start', role_names)
def create_hbase_root(self):
"""Create the root directory of an HBase service
:return: Reference to the submitted command.
"""
return self._cmd('hbaseCreateRoot')
def create_hdfs_tmp(self):
"""Create /tmp directory in HDFS
Create the /tmp directory in HDFS with appropriate ownership and
permissions.
:return: Reference to the submitted command
:since: API v2
"""
return self._cmd('hdfsCreateTmpDir')
def refresh(self, *role_names):
"""Execute the "refresh" command on a set of roles
:param role_names: Names of the roles to refresh.
:return: Reference to the submitted command.
"""
return self._role_cmd('refresh', role_names)
def decommission(self, *role_names):
"""Decommission roles in a service
:param role_names: Names of the roles to decommission.
:return: Reference to the submitted command.
"""
return self._cmd('decommission', data=role_names)
def deploy_client_config(self, *role_names):
"""Deploys client configuration to the hosts where roles are running
:param role_names: Names of the roles to decommission.
:return: Reference to the submitted command.
"""
return self._cmd('deployClientConfig', data=role_names)
def format_hdfs(self, *namenodes):
"""Format NameNode instances of an HDFS service
:param namenodes: Name of NameNode instances to format.
:return: List of submitted commands.
"""
return self._role_cmd('hdfsFormat', namenodes)
def install_oozie_sharelib(self):
"""Installs the Oozie ShareLib
Oozie must be stopped before running this command.
:return: Reference to the submitted command.
:since: API v3
"""
return self._cmd('installOozieShareLib', api_version=3)
def create_oozie_db(self):
"""Creates the Oozie Database Schema in the configured database
:return: Reference to the submitted command.
:since: API v2
"""
return self._cmd('createOozieDb', api_version=2)
def upgrade_oozie_db(self):
"""Upgrade Oozie Database schema as part of a major version upgrade
:return: Reference to the submitted command.
:since: API v6
"""
return self._cmd('oozieUpgradeDb', api_version=6)
def create_hive_metastore_tables(self):
"""Creates the Hive metastore tables in the configured database
Will do nothing if tables already exist. Will not perform an upgrade.
:return: Reference to the submitted command.
:since: API v3
"""
return self._cmd('hiveCreateMetastoreDatabaseTables', api_version=3)
def create_hive_warehouse(self):
"""Creates the Hive warehouse directory in HDFS
:return: Reference to the submitted command.
:since: API v3
"""
return self._cmd('hiveCreateHiveWarehouse')
def create_hive_userdir(self):
"""Creates the Hive user directory in HDFS
:return: Reference to the submitted command.
:since: API v4
"""
return self._cmd('hiveCreateHiveUserDir')
class ApiServiceSetupInfo(ApiService):
_ATTRIBUTES = {
'name': None,
'type': None,
'config': types.Attr(types.ApiConfig),
'roles': types.Attr(roles.ApiRole),
}
def __init__(self, name=None, type=None,
config=None, roles=None):
# The BaseApiObject expects a resource_root, which we don't care about
resource_root = None
# Unfortunately, the json key is called "type". So our input arg
# needs to be called "type" as well, despite it being a python keyword.
types.BaseApiObject.init(self, None, locals())
def set_config(self, config):
"""Set the service configuration
:param config: A dictionary of config key/value
"""
if self.config is None:
self.config = {}
self.config.update(types.config_to_api_list(config))
def add_role_info(self, role_name, role_type, host_id, config=None):
"""Add a role info
The role will be created along with the service setup.
:param role_name: Role name
:param role_type: Role type
:param host_id: The host where the role should run
:param config: (Optional) A dictionary of role config values
"""
if self.roles is None:
self.roles = []
api_config_list = (config is not None
and types.config_to_api_list(config)
or None)
self.roles.append({
'name': role_name,
'type': role_type,
'hostRef': {'hostId': host_id},
'config': api_config_list})

View File

@ -0,0 +1,680 @@
# Copyright (c) 2014 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# The contents of this file are mainly copied from cm_api sources,
# released by Cloudrea. Codes not used by Sahara CDH plugin are removed.
# You can find the original codes at
#
# https://github.com/cloudera/cm_api/tree/master/python/src/cm_api
#
# To satisfy the pep8 and python3 tests, we did some changes to the codes.
# We also change some importings to use Sahara inherited classes.
import copy
import datetime
import json
import time
import six
from sahara import context
from sahara.i18n import _
from sahara.plugins.cdh import exceptions as ex
class Attr(object):
"""Base Attribute
Encapsulates information about an attribute in the JSON encoding of the
object. It identifies properties of the attribute such as whether it's
read-only, its type, etc.
"""
DATE_FMT = "%Y-%m-%dT%H:%M:%S.%fZ"
def __init__(self, atype=None, rw=True, is_api_list=False):
self._atype = atype
self._is_api_list = is_api_list
self.rw = rw
def to_json(self, value, preserve_ro):
"""Returns the JSON encoding of the given attribute value
If the value has a 'to_json_dict' object, that method is called.
Otherwise, the following values are returned for each input type:
- datetime.datetime: string with the API representation of a date.
- dictionary: if 'atype' is ApiConfig, a list of ApiConfig objects.
- python list: python list (or ApiList) with JSON encoding of items
- the raw value otherwise
"""
if hasattr(value, 'to_json_dict'):
return value.to_json_dict(preserve_ro)
elif isinstance(value, dict) and self._atype == ApiConfig:
return config_to_api_list(value)
elif isinstance(value, datetime.datetime):
return value.strftime(self.DATE_FMT)
elif isinstance(value, list) or isinstance(value, tuple):
if self._is_api_list:
return ApiList(value).to_json_dict()
else:
return [self.to_json(x, preserve_ro) for x in value]
else:
return value
def from_json(self, resource_root, data):
"""Parses the given JSON value into an appropriate python object
This means:
- a datetime.datetime if 'atype' is datetime.datetime
- a converted config dictionary or config list if 'atype' is ApiConfig
- if the attr is an API list, an ApiList with instances of 'atype'
- an instance of 'atype' if it has a 'from_json_dict' method
- a python list with decoded versions of the member objects if the
input is a python list.
- the raw value otherwise
"""
if data is None:
return None
if self._atype == datetime.datetime:
return datetime.datetime.strptime(data, self.DATE_FMT)
elif self._atype == ApiConfig:
# ApiConfig is special. We want a python dictionary for summary
# views, but an ApiList for full views. Try to detect each case
# from the JSON data.
if not data['items']:
return {}
first = data['items'][0]
return json_to_config(data, len(first) == 2)
elif self._is_api_list:
return ApiList.from_json_dict(data, resource_root, self._atype)
elif isinstance(data, list):
return [self.from_json(resource_root, x) for x in data]
elif hasattr(self._atype, 'from_json_dict'):
return self._atype.from_json_dict(data, resource_root)
else:
return data
class ROAttr(Attr):
"""Subclass that just defines the attribute as read-only."""
def __init__(self, atype=None, is_api_list=False):
Attr.__init__(self, atype=atype, rw=False, is_api_list=is_api_list)
def check_api_version(resource_root, min_version):
"""Check API version
Checks if the resource_root's API version it at least the given minimum
version.
"""
if resource_root.version < min_version:
raise ex.CMApiVersionError(
_("API version %(minv)s is required but %(acv)s is in use.")
% {'minv': min_version, 'acv': resource_root.version})
def call(method, path, ret_type,
ret_is_list=False, data=None, params=None, api_version=1):
"""Call a resource method
Generic function for calling a resource method and automatically dealing
with serialization of parameters and deserialization of return values.
:param method: method to call (must be bound to a resource;
e.g., "resource_root.get").
:param path: the full path of the API method to call.
:param ret_type: return type of the call.
:param ret_is_list: whether the return type is an ApiList.
:param data: Optional data to send as payload to the call.
:param params: Optional query parameters for the call.
:param api_version: minimum API version for the call.
"""
check_api_version(method.im_self, api_version)
if data is not None:
data = json.dumps(Attr(is_api_list=True).to_json(data, False))
ret = method(path, data=data, params=params)
else:
ret = method(path, params=params)
if ret_type is None:
return
elif ret_is_list:
return ApiList.from_json_dict(ret, method.im_self, ret_type)
elif isinstance(ret, list):
return [ret_type.from_json_dict(x, method.im_self) for x in ret]
else:
return ret_type.from_json_dict(ret, method.im_self)
class BaseApiObject(object):
"""The BaseApiObject helps with (de)serialization from/to JSON
The derived class has two ways of defining custom attributes:
- Overwriting the '_ATTRIBUTES' field with the attribute dictionary
- Override the _get_attributes() method, in case static initialization of
the above field is not possible.
It's recommended that the _get_attributes() implementation do caching to
avoid computing the dictionary on every invocation.
The derived class's constructor must call the base class's init() static
method. All constructor arguments (aside from self and resource_root) must
be keywords arguments with default values (typically None), or
from_json_dict() will not work.
"""
_ATTRIBUTES = {}
_WHITELIST = ('_resource_root', '_attributes')
@classmethod
def _get_attributes(cls):
"""Get an attribute dictionary
Returns a map of property names to attr instances (or None for default
attribute behavior) describing the properties of the object.
By default, this method will return the class's _ATTRIBUTES field.
Classes can override this method to do custom initialization of the
attributes when needed.
"""
return cls._ATTRIBUTES
@staticmethod
def init(obj, resource_root, attrs=None):
"""Wraper of real constructor
Wraper around the real constructor to avoid issues with the 'self'
argument. Call like this, from a subclass's constructor:
- BaseApiObject.init(self, locals())
"""
# This works around http://bugs.python.org/issue2646
# We use unicode strings as keys in kwargs.
str_attrs = {}
if attrs:
for k, v in six.iteritems(attrs):
if k not in ('self', 'resource_root'):
str_attrs[k] = v
BaseApiObject.__init__(obj, resource_root, **str_attrs)
def __init__(self, resource_root, **attrs):
"""Init method
Initializes internal state and sets all known writable properties of
the object to None. Then initializes the properties given in the
provided attributes dictionary.
:param resource_root: API resource object.
:param attrs: optional dictionary of attributes to set. This should
only contain r/w attributes.
"""
self._resource_root = resource_root
for name, attr in six.iteritems(self._get_attributes()):
object.__setattr__(self, name, None)
if attrs:
self._set_attrs(attrs, from_json=False)
def _set_attrs(self, attrs, allow_ro=False, from_json=True):
"""Set attributes from dictionary
Sets all the attributes in the dictionary. Optionally, allows setting
read-only attributes (e.g. when deserializing from JSON) and skipping
JSON deserialization of values.
"""
for k, v in six.iteritems(attrs):
attr = self._check_attr(k, allow_ro)
if attr and from_json:
v = attr.from_json(self._get_resource_root(), v)
object.__setattr__(self, k, v)
def __setattr__(self, name, val):
if name not in BaseApiObject._WHITELIST:
self._check_attr(name, False)
object.__setattr__(self, name, val)
def _check_attr(self, name, allow_ro):
if name not in self._get_attributes():
raise ex.CMApiAttributeError(
_('Invalid property %(attname)s for class %(classname)s.')
% {'attname': name, 'classname': self.__class__.__name__})
attr = self._get_attributes()[name]
if not allow_ro and attr and not attr.rw:
raise ex.CMApiAttributeError(
_('Attribute %(attname)s of class %(classname)s '
'is read only.')
% {'attname': name, 'classname': self.__class__.__name__})
return attr
def _get_resource_root(self):
return self._resource_root
def _update(self, api_obj):
"""Copy state from api_obj to this object."""
if not isinstance(self, api_obj.__class__):
raise ex.CMApiValueError(
_("Class %(class1)s does not derive from %(class2)s; "
"cannot update attributes.")
% {'class1': self.__class__, 'class2': api_obj.__class__})
for name in self._get_attributes().keys():
try:
val = getattr(api_obj, name)
setattr(self, name, val)
except AttributeError:
pass
def to_json_dict(self, preserve_ro=False):
dic = {}
for name, attr in six.iteritems(self._get_attributes()):
if not preserve_ro and attr and not attr.rw:
continue
try:
value = getattr(self, name)
if value is not None:
if attr:
dic[name] = attr.to_json(value, preserve_ro)
else:
dic[name] = value
except AttributeError:
pass
return dic
def __str__(self):
"""Give a printable format of an attribute
Default implementation of __str__. Uses the type name and the first
attribute retrieved from the attribute map to create the string.
"""
name = self._get_attributes().keys()[0]
value = getattr(self, name, None)
return "<%s>: %s = %s" % (self.__class__.__name__, name, value)
@classmethod
def from_json_dict(cls, dic, resource_root):
obj = cls(resource_root)
obj._set_attrs(dic, allow_ro=True)
return obj
class BaseApiResource(BaseApiObject):
"""Base ApiResource
A specialization of BaseApiObject that provides some utility methods for
resources. This class allows easier serialization / deserialization of
parameters and return values.
"""
def _api_version(self):
"""Get API version
Returns the minimum API version for this resource. Defaults to 1.
"""
return 1
def _path(self):
"""Get resource path
Returns the path to the resource.
e.g., for a service 'foo' in cluster 'bar', this should return
'/clusters/bar/services/foo'.
"""
raise NotImplementedError
def _require_min_api_version(self, version):
"""Check mininum verson requirement
Raise an exception if the version of the api is less than the given
version.
:param version: The minimum required version.
"""
actual_version = self._get_resource_root().version
version = max(version, self._api_version())
if actual_version < version:
raise ex.CMApiVersionError(
_("API version %(minv)s is required but %(acv)s is in use.")
% {'minv': version, 'acv': actual_version})
def _cmd(self, command, data=None, params=None, api_version=1):
"""Invoke a command on the resource
Invokes a command on the resource. Commands are expected to be under
the "commands/" sub-resource.
"""
return self._post("commands/" + command, ApiCommand,
data=data, params=params, api_version=api_version)
def _get_config(self, rel_path, view, api_version=1):
"""Get resource configurations
Retrieves an ApiConfig list from the given relative path.
"""
self._require_min_api_version(api_version)
params = dict(view=view) if view else None
resp = self._get_resource_root().get(self._path() + '/' + rel_path,
params=params)
return json_to_config(resp, view == 'full')
def _update_config(self, rel_path, config, api_version=1):
self._require_min_api_version(api_version)
resp = self._get_resource_root().put(self._path() + '/' + rel_path,
data=config_to_json(config))
return json_to_config(resp, False)
def _delete(self, rel_path, ret_type, ret_is_list=False, params=None,
api_version=1):
return self._call('delete', rel_path, ret_type, ret_is_list, None,
params, api_version)
def _get(self, rel_path, ret_type, ret_is_list=False, params=None,
api_version=1):
return self._call('get', rel_path, ret_type, ret_is_list, None,
params, api_version)
def _post(self, rel_path, ret_type, ret_is_list=False, data=None,
params=None, api_version=1):
return self._call('post', rel_path, ret_type, ret_is_list, data,
params, api_version)
def _put(self, rel_path, ret_type, ret_is_list=False, data=None,
params=None, api_version=1):
return self._call('put', rel_path, ret_type, ret_is_list, data,
params, api_version)
def _call(self, method, rel_path, ret_type, ret_is_list=False, data=None,
params=None, api_version=1):
path = self._path()
if rel_path:
path += '/' + rel_path
return call(getattr(self._get_resource_root(), method),
path,
ret_type,
ret_is_list,
data,
params,
api_version)
class ApiList(BaseApiObject):
"""A list of some api object"""
LIST_KEY = "items"
def __init__(self, objects, resource_root=None, **attrs):
BaseApiObject.__init__(self, resource_root, **attrs)
# Bypass checks in BaseApiObject.__setattr__
object.__setattr__(self, 'objects', objects)
def __str__(self):
return ("<ApiList>(%d): [%s]" % (len(self.objects),
", ".join([str(item) for item in self.objects])))
def to_json_dict(self, preserve_ro=False):
ret = BaseApiObject.to_json_dict(self, preserve_ro)
attr = Attr()
ret[ApiList.LIST_KEY] = [attr.to_json(x, preserve_ro)
for x in self.objects]
return ret
def __len__(self):
return self.objects.__len__()
def __iter__(self):
return self.objects.__iter__()
def __getitem__(self, i):
return self.objects.__getitem__(i)
def __getslice(self, i, j):
return self.objects.__getslice__(i, j)
@classmethod
def from_json_dict(cls, dic, resource_root, member_cls=None):
if not member_cls:
member_cls = cls._MEMBER_CLASS
attr = Attr(atype=member_cls)
items = []
if ApiList.LIST_KEY in dic:
items = [attr.from_json(resource_root, x)
for x in dic[ApiList.LIST_KEY]]
ret = cls(items)
# If the class declares custom attributes, populate them based on the
# input dict. The check avoids extra overhead for the common case,
# where we just have a plain list. _set_attrs() also does not
# understand the "items" attribute, so it can't be in the input data.
if cls._ATTRIBUTES:
if ApiList.LIST_KEY in dic:
dic = copy.copy(dic)
del dic[ApiList.LIST_KEY]
ret._set_attrs(dic, allow_ro=True)
return ret
class ApiHostRef(BaseApiObject):
_ATTRIBUTES = {
'hostId': None,
}
def __init__(self, resource_root, hostId=None):
BaseApiObject.init(self, resource_root, locals())
def __str__(self):
return "<ApiHostRef>: %s" % (self.hostId)
class ApiServiceRef(BaseApiObject):
_ATTRIBUTES = {
'clusterName': None,
'serviceName': None,
'peerName': None,
}
def __init__(self, resource_root, serviceName=None, clusterName=None,
peerName=None):
BaseApiObject.init(self, resource_root, locals())
class ApiClusterRef(BaseApiObject):
_ATTRIBUTES = {
'clusterName': None,
}
def __init__(self, resource_root, clusterName=None):
BaseApiObject.init(self, resource_root, locals())
class ApiRoleRef(BaseApiObject):
_ATTRIBUTES = {
'clusterName': None,
'serviceName': None,
'roleName': None,
}
def __init__(self, resource_root, serviceName=None, roleName=None,
clusterName=None):
BaseApiObject.init(self, resource_root, locals())
class ApiRoleConfigGroupRef(BaseApiObject):
_ATTRIBUTES = {
'roleConfigGroupName': None,
}
def __init__(self, resource_root, roleConfigGroupName=None):
BaseApiObject.init(self, resource_root, locals())
class ApiCommand(BaseApiObject):
SYNCHRONOUS_COMMAND_ID = -1
@classmethod
def _get_attributes(cls):
if not ('_ATTRIBUTES' in cls.__dict__):
cls._ATTRIBUTES = {
'id': ROAttr(),
'name': ROAttr(),
'startTime': ROAttr(datetime.datetime),
'endTime': ROAttr(datetime.datetime),
'active': ROAttr(),
'success': ROAttr(),
'resultMessage': ROAttr(),
'clusterRef': ROAttr(ApiClusterRef),
'serviceRef': ROAttr(ApiServiceRef),
'roleRef': ROAttr(ApiRoleRef),
'hostRef': ROAttr(ApiHostRef),
'children': ROAttr(ApiCommand, is_api_list=True),
'parent': ROAttr(ApiCommand),
'resultDataUrl': ROAttr(),
}
return cls._ATTRIBUTES
def __str__(self):
return ("<ApiCommand>: '%s' (id: %s; active: %s; success: %s)"
% (self.name, self.id, self.active, self.success))
def _path(self):
return '/commands/%d' % self.id
def fetch(self):
"""Retrieve updated data about the command from the server
:return: A new ApiCommand object.
"""
if self.id == ApiCommand.SYNCHRONOUS_COMMAND_ID:
return self
resp = self._get_resource_root().get(self._path())
return ApiCommand.from_json_dict(resp, self._get_resource_root())
def wait(self, timeout=None):
"""Wait for command to finish
:param timeout: (Optional) Max amount of time (in seconds) to wait.
Wait forever by default.
:return: The final ApiCommand object, containing the last known state.
The command may still be running in case of timeout.
"""
if self.id == ApiCommand.SYNCHRONOUS_COMMAND_ID:
return self
SLEEP_SEC = 5
if timeout is None:
deadline = None
else:
deadline = time.time() + timeout
while True:
cmd = self.fetch()
if not cmd.active:
return cmd
if deadline is not None:
now = time.time()
if deadline < now:
return cmd
else:
context.sleep(min(SLEEP_SEC, deadline - now))
else:
context.sleep(SLEEP_SEC)
def abort(self):
"""Abort a running command
:return: A new ApiCommand object with the updated information.
"""
if self.id == ApiCommand.SYNCHRONOUS_COMMAND_ID:
return self
path = self._path() + '/abort'
resp = self._get_resource_root().post(path)
return ApiCommand.from_json_dict(resp, self._get_resource_root())
class ApiBulkCommandList(ApiList):
_ATTRIBUTES = {
'errors': ROAttr(),
}
_MEMBER_CLASS = ApiCommand
#
# Configuration helpers.
#
class ApiConfig(BaseApiObject):
_ATTRIBUTES = {
'name': None,
'value': None,
'required': ROAttr(),
'default': ROAttr(),
'displayName': ROAttr(),
'description': ROAttr(),
'relatedName': ROAttr(),
'validationState': ROAttr(),
'validationMessage': ROAttr(),
}
def __init__(self, resource_root, name=None, value=None):
BaseApiObject.init(self, resource_root, locals())
def __str__(self):
return "<ApiConfig>: %s = %s" % (self.name, self.value)
def config_to_api_list(dic):
"""Convert a python dictionary into an ApiConfig list
Converts a python dictionary into a list containing the proper
ApiConfig encoding for configuration data.
:param dic: Key-value pairs to convert.
:return: JSON dictionary of an ApiConfig list (*not* an ApiList).
"""
config = []
for k, v in six.iteritems(dic):
config.append({'name': k, 'value': v})
return {ApiList.LIST_KEY: config}
def config_to_json(dic):
"""Converts a python dictionary into a JSON payload
The payload matches the expected "apiConfig list" type used to update
configuration parameters using the API.
:param dic: Key-value pairs to convert.
:return: String with the JSON-encoded data.
"""
return json.dumps(config_to_api_list(dic))
def json_to_config(dic, full=False):
"""Converts a JSON-decoded config dictionary to a python dictionary
When materializing the full view, the values in the dictionary will be
instances of ApiConfig, instead of strings.
:param dic: JSON-decoded config dictionary.
:param full: Whether to materialize the full view of the config data.
:return: Python dictionary with config data.
"""
config = {}
for entry in dic['items']:
k = entry['name']
if full:
config[k] = ApiConfig.from_json_dict(entry, None)
else:
config[k] = entry.get('value')
return config

View File

@ -15,20 +15,13 @@
import functools
# cm_api client is not present in OS requirements
try:
from cm_api import api_client
from cm_api.endpoints import services
except ImportError:
api_client = None
services = None
from oslo_log import log as logging
from oslo_utils import timeutils
from sahara import context
from sahara.i18n import _
from sahara.i18n import _LE
from sahara.plugins.cdh.client import api_client
from sahara.plugins.cdh.client import services
from sahara.plugins import exceptions as ex
from sahara.utils import cluster_progress_ops as cpo
@ -69,15 +62,6 @@ class ClouderaUtils(object):
# pu will be defined in derived class.
self.pu = None
def have_cm_api_libs(self):
return api_client and services
def validate_cm_api_libs(self):
if not self.have_cm_api_libs():
LOG.error(_LE("For provisioning cluster with CDH plugin install"
" 'cm_api' package version 6.0.2 or later."))
raise ex.HadoopProvisionError(_("'cm_api' is not installed."))
def get_api_client(self, cluster):
manager_ip = self.pu.get_manager(cluster).management_ip
return api_client.ApiResource(manager_ip,

View File

@ -0,0 +1,78 @@
# Copyright (c) 2015 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara import exceptions as e
from sahara.i18n import _
class CMApiVersionError(e.SaharaException):
"""Exception indicating that CM API Version does not meet requirement.
A message indicating the reason for failure must be provided.
"""
base_message = _("CM API version not meet requirement: %s")
def __init__(self, message):
self.code = "CM_API_VERSION_ERROR"
self.message = self.base_message % message
super(CMApiVersionError, self).__init__()
class CMApiException(e.SaharaException):
"""Exception Type from CM API Errors.
Any error result from the CM API is converted into this exception type.
This handles errors from the HTTP level as well as the API level.
"""
base_message = _("CM API error: %s")
def __init__(self, message):
self.code = "CM_API_EXCEPTION"
self.message = self.base_message % message
super(CMApiException, self).__init__()
class CMApiAttributeError(e.SaharaException):
"""Exception indicating a CM API attribute error.
A message indicating the reason for failure must be provided.
"""
base_message = _("CM API attribute error: %s")
def __init__(self, message):
self.code = "CM_API_ATTRIBUTE_ERROR"
self.message = self.base_message % message
super(CMApiAttributeError, self).__init__()
class CMApiValueError(e.SaharaException):
"""Exception indicating a CM API value error.
A message indicating the reason for failure must be provided.
"""
base_message = _("CM API value error: %s")
def __init__(self, message):
self.code = "CM_API_VALUE_ERROR"
self.message = self.base_message % message
super(CMApiValueError, self).__init__()

View File

@ -13,14 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# cm_api client is not present in OS requirements
try:
from cm_api import api_client
from cm_api.endpoints import services
except ImportError:
api_client = None
services = None
import six
from sahara.i18n import _

View File

@ -15,7 +15,7 @@
import json
from cm_api import api_client
from sahara.plugins.cdh.client import api_client
# -- cm config --

View File

@ -13,15 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from sahara.i18n import _
from sahara.plugins.cdh.v5 import plugin_utils as pu
from sahara.plugins import exceptions as ex
from sahara.plugins import utils as u
from sahara.utils import general as gu
LOG = logging.getLogger(__name__)
PU = pu.PluginUtilsV5()

View File

@ -13,9 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from sahara import conductor
from sahara import context
from sahara.plugins.cdh import abstractversionhandler as avm
@ -27,9 +24,6 @@ from sahara.plugins.cdh.v5 import validation as vl
conductor = conductor.API
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CU = cu.ClouderaUtilsV5()
@ -63,7 +57,6 @@ class VersionHandler(avm.AbstractVersionHandler):
}
def validate(self, cluster):
CU.validate_cm_api_libs()
vl.validate_cluster_creating(cluster)
def configure_cluster(self, cluster):

View File

@ -13,14 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# cm_api client is not present in OS requirements
try:
from cm_api import api_client
from cm_api.endpoints import services
except ImportError:
api_client = None
services = None
import six
from sahara.i18n import _
@ -79,13 +71,6 @@ class ClouderaUtilsV530(cu.ClouderaUtils):
cu.ClouderaUtils.__init__(self)
self.pu = pu.PluginUtilsV530()
def get_api_client(self, cluster):
manager_ip = self.pu.get_manager(cluster).management_ip
return api_client.ApiResource(manager_ip,
username=self.CM_DEFAULT_USERNAME,
password=self.CM_DEFAULT_PASSWD,
version=self.CM_API_VERSION)
def get_service_by_role(self, process, cluster=None, instance=None):
cm_cluster = None
if cluster:

View File

@ -15,8 +15,7 @@
import json
from cm_api import api_client
from sahara.plugins.cdh.client import api_client
# -- cm config --

View File

@ -13,15 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from sahara.i18n import _
from sahara.plugins.cdh.v5_3_0 import plugin_utils as pu
from sahara.plugins import exceptions as ex
from sahara.plugins import utils as u
from sahara.utils import general as gu
LOG = logging.getLogger(__name__)
PU = pu.PluginUtilsV530()

View File

@ -13,9 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from sahara import conductor
from sahara import context
from sahara.plugins.cdh import abstractversionhandler as avm
@ -27,8 +24,6 @@ from sahara.plugins.cdh.v5_3_0 import validation as vl
conductor = conductor.API
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CU = cu.ClouderaUtilsV530()
@ -71,7 +66,6 @@ class VersionHandler(avm.AbstractVersionHandler):
}
def validate(self, cluster):
CU.validate_cm_api_libs()
vl.validate_cluster_creating(cluster)
def configure_cluster(self, cluster):