Add validation of external resources for CSAR archives

Add support for validation of external resources specified in
imports, interfaces, and artifacts sections of the main yaml
template in CSAR archives.

Change-Id: I7dc6875cb8117ca56066552ec7d23ce98621f31d
Partially-Implements: blueprint tosca-csar-translation
This commit is contained in:
Vahid Hashemian
2015-09-29 16:15:22 -07:00
parent 30603c67a9
commit af2a43f946
10 changed files with 168 additions and 8 deletions

View File

@@ -12,11 +12,14 @@
import os.path
import requests
import six
import tempfile
import yaml
import zipfile
from toscaparser.common.exception import URLException
from toscaparser.common.exception import ValidationError
from toscaparser.imports import ImportsLoader
from toscaparser.utils.gettextutils import _
from toscaparser.utils.urlutils import UrlUtils
@@ -93,6 +96,10 @@ class CSAR(object):
'"%s" does not exist.') % self.csar_file)
raise ValidationError(message=err_msg)
# validate that external references in the main template actually exist
# and are accessible
self._validate_external_references()
def get_metadata(self):
"""Return the metadata dictionary."""
@@ -148,3 +155,90 @@ class CSAR(object):
with zipfile.ZipFile(self.csar_file, "r") as zf:
zf.extractall(folder)
return folder
def _validate_external_references(self):
"""Extracts files referenced in the main template
These references are currently supported:
* imports
* interface implementations
* artifacts
"""
temp_dir = self.decompress()
main_tpl_file = self.get_main_template()
main_tpl = self.get_main_template_yaml()
if 'imports' in main_tpl:
ImportsLoader(main_tpl['imports'],
os.path.join(temp_dir, main_tpl_file))
if 'topology_template' not in main_tpl:
return
topology_template = main_tpl['topology_template']
if 'node_templates' not in topology_template:
return
node_templates = topology_template['node_templates']
for node_template_key in node_templates:
node_template = node_templates[node_template_key]
if 'artifacts' in node_template:
artifacts = node_template['artifacts']
for artifact_key in artifacts:
artifact = artifacts[artifact_key]
if isinstance(artifact, six.string_types):
self._validate_external_reference(temp_dir,
main_tpl_file,
artifact)
elif isinstance(artifact, dict):
if 'file' in artifact:
self._validate_external_reference(temp_dir,
main_tpl_file,
artifact['file'])
else:
raise ValueError(_('Unexpected artifact definition '
'for %s.') % artifact_key)
if 'interfaces' in node_template:
interfaces = node_template['interfaces']
for interface_key in interfaces:
interface = interfaces[interface_key]
for opertation_key in interface:
operation = interface[opertation_key]
if isinstance(operation, six.string_types):
self._validate_external_reference(temp_dir,
main_tpl_file,
operation,
False)
elif isinstance(operation, dict):
if 'implementation' in operation:
self._validate_external_reference(
temp_dir, main_tpl_file,
operation['implementation'])
def _validate_external_reference(self, base_dir, tpl_file, resource_file,
raise_exc=True):
"""Verify that the external resource exists
If resource_file is a URL verify that the URL is valid.
If resource_file is a relative path verify that the path is valid
considering base_dir and tpl_file.
Note that in a CSAR resource_file cannot be an absolute path.
"""
if UrlUtils.validate_url(resource_file):
msg = (_('The resource at %s cannot be accessed') % resource_file)
try:
if UrlUtils.url_accessible(resource_file):
return
else:
raise URLException(what=msg)
except Exception:
raise URLException(what=msg)
if os.path.isfile(os.path.join(base_dir,
os.path.dirname(tpl_file),
resource_file)):
return
if raise_exc:
raise ValueError(_('The resource %s does not exist.')
% resource_file)

View File

@@ -37,9 +37,9 @@ topology_template:
- database_endpoint: mysql_database
interfaces:
Standard:
create: Scripts/WordPress/install.sh
create: ../Scripts/WordPress/install.sh
configure:
implementation: Scripts/WordPress/configure.sh
implementation: ../Scripts/WordPress/configure.sh
inputs:
wp_db_name: { get_property: [ mysql_database, name ] }
wp_db_user: { get_property: [ mysql_database, user ] }
@@ -56,7 +56,7 @@ topology_template:
interfaces:
Standard:
configure:
implementation: Scripts/MYSQLDatabase/configure.sh
implementation: ../Scripts/MYSQLDatabase/configure.sh
inputs:
db_name: { get_property: [ SELF, name ] }
db_user: { get_property: [ SELF, user ] }
@@ -72,10 +72,10 @@ topology_template:
- host: server
interfaces:
Standard:
create: Scripts/MYSQLDBMS/install.sh
start: Scripts/MYSQLDBMS/start.sh
create: ../Scripts/MYSQLDBMS/install.sh
start: ../Scripts/MYSQLDBMS/start.sh
configure:
implementation: Scripts/MYSQLDBMS/configure.sh
implementation: ../Scripts/MYSQLDBMS/configure.sh
inputs:
root_password: { get_property: [ mysql_dbms, root_password ] }
@@ -85,8 +85,8 @@ topology_template:
- host: server
interfaces:
Standard:
create: Scripts/WebServer/install.sh
start: Scripts/WebServer/start.sh
create: ../Scripts/WebServer/install.sh
start: ../Scripts/WebServer/start.sh
server:
type: tosca.nodes.Compute

View File

@@ -13,6 +13,7 @@
import os
import zipfile
from toscaparser.common.exception import URLException
from toscaparser.common.exception import ValidationError
from toscaparser.prereq.csar import CSAR
from toscaparser.tests.base import TestCase
@@ -81,11 +82,60 @@ class CSARPrereqTest(TestCase):
self.assertEqual(_('The "Entry-Definitions" file defined in the CSAR '
'"%s" does not exist.') % path, str(error))
def test_csar_invalid_import_path(self):
path = os.path.join(self.base_path,
"data/CSAR/csar_wordpress_invalid_import_path.zip")
csar = CSAR(path)
error = self.assertRaises(ImportError, csar.validate)
self.assertEqual(_('Import Definitions/wordpress.yaml is not valid'),
str(error))
def test_csar_invalid_import_url(self):
path = os.path.join(self.base_path,
"data/CSAR/csar_wordpress_invalid_import_url.zip")
csar = CSAR(path)
error = self.assertRaises(URLException, csar.validate)
self.assertEqual(_('URLException "Failed to reach server '
'https://raw.githubusercontent.com/openstack/'
'tosca-parser/master/toscaparser/tests/data/CSAR/'
'tosca_single_instance_wordpress/Definitions/'
'wordpress1.yaml. Reason is : Not Found".'),
str(error))
def test_csar_invalid_script_path(self):
path = os.path.join(self.base_path,
"data/CSAR/csar_wordpress_invalid_script_path.zip")
csar = CSAR(path)
error = self.assertRaises(ValueError, csar.validate)
self.assertTrue(
str(error) == _('The resource Scripts/WordPress/install.sh does '
'not exist.') or
str(error) == _('The resource Scripts/WordPress/configure.sh does '
'not exist.'))
def test_csar_invalid_script_url(self):
path = os.path.join(self.base_path,
"data/CSAR/csar_wordpress_invalid_script_url.zip")
csar = CSAR(path)
error = self.assertRaises(URLException, csar.validate)
self.assertEqual(_('URLException "The resource at '
'https://raw.githubusercontent.com/openstack/'
'tosca-parser/master/toscaparser/tests/data/CSAR/'
'tosca_single_instance_wordpress/Scripts/WordPress/'
'install1.sh cannot be accessed".'),
str(error))
def test_valid_csar(self):
path = os.path.join(self.base_path, "data/CSAR/csar_hello_world.zip")
csar = CSAR(path)
self.assertIsNone(csar.validate())
def test_valid_csar_with_url_import_and_script(self):
path = os.path.join(self.base_path, "data/CSAR/csar_wordpress_with_url"
"_import_and_script.zip")
csar = CSAR(path)
self.assertIsNone(csar.validate())
def test_metadata_invalid_csar(self):
path = os.path.join(self.base_path,
"data/CSAR/csar_metadata_not_yaml.zip")

View File

@@ -15,6 +15,13 @@ from six.moves.urllib.parse import urljoin
from six.moves.urllib.parse import urlparse
from toscaparser.utils.gettextutils import _
try:
# Python 3.x
import urllib.request as urllib
except ImportError:
# Python 2.x
import urllib
class UrlUtils(object):
@@ -41,3 +48,12 @@ class UrlUtils(object):
if not UrlUtils.validate_url(url):
raise ValueError(_("Provided URL is invalid."))
return urljoin(url, relative_path)
@staticmethod
def url_accessible(url):
"""Validates whether the given URL is accessible.
Returns true if the get call returns a 200 response code.
Otherwise, returns false.
"""
return urllib.urlopen(url).getcode() == 200