1 # Licensed under the Apache License, Version 2.0 (the "License"); you may
2 # not use this file except in compliance with the License. You may obtain
3 # a copy of the License at
5 # http://www.apache.org/licenses/LICENSE-2.0
7 # Unless required by applicable law or agreed to in writing, software
8 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10 # License for the specific language governing permissions and limitations
21 from toscaparser.common.exception import ExceptionCollector
22 from toscaparser.common.exception import URLException
23 from toscaparser.common.exception import ValidationError
24 from toscaparser.imports import ImportsLoader
25 from toscaparser.utils.gettextutils import _
26 from toscaparser.utils.urlutils import UrlUtils
29 from BytesIO import BytesIO
30 except ImportError: # Python 3.x
31 from io import BytesIO
36 def __init__(self, csar_file, a_file=True):
39 self.is_validated = False
40 self.error_caught = False
45 """Validate the provided CSAR file."""
47 self.is_validated = True
49 # validate that the file or URL exists
50 missing_err_msg = (_('"%s" does not exist.') % self.path)
52 if not os.path.isfile(self.path):
53 ExceptionCollector.appendException(
54 ValidationError(message=missing_err_msg))
59 if not UrlUtils.validate_url(self.path):
60 ExceptionCollector.appendException(
61 ValidationError(message=missing_err_msg))
64 response = requests.get(self.path)
65 self.csar = BytesIO(response.content)
67 # validate that it is a valid zip file
68 if not zipfile.is_zipfile(self.csar):
69 err_msg = (_('"%s" is not a valid zip file.') % self.path)
70 ExceptionCollector.appendException(
71 ValidationError(message=err_msg))
74 # validate that it contains the metadata file in the correct location
75 self.zfile = zipfile.ZipFile(self.csar, 'r')
76 filelist = self.zfile.namelist()
77 if 'TOSCA-Metadata/TOSCA.meta' not in filelist:
78 err_msg = (_('"%s" is not a valid CSAR as it does not contain the '
79 'required file "TOSCA.meta" in the folder '
80 '"TOSCA-Metadata".') % self.path)
81 ExceptionCollector.appendException(
82 ValidationError(message=err_msg))
85 # validate that 'Entry-Definitions' property exists in TOSCA.meta
86 data = self.zfile.read('TOSCA-Metadata/TOSCA.meta')
87 invalid_yaml_err_msg = (_('The file "TOSCA-Metadata/TOSCA.meta" in '
88 'the CSAR "%s" does not contain valid YAML '
89 'content.') % self.path)
91 meta = yaml.load(data)
92 if type(meta) is dict:
95 ExceptionCollector.appendException(
96 ValidationError(message=invalid_yaml_err_msg))
98 except yaml.YAMLError:
99 ExceptionCollector.appendException(
100 ValidationError(message=invalid_yaml_err_msg))
103 if 'Entry-Definitions' not in self.metadata:
104 err_msg = (_('The CSAR "%s" is missing the required metadata '
105 '"Entry-Definitions" in '
106 '"TOSCA-Metadata/TOSCA.meta".')
108 ExceptionCollector.appendException(
109 ValidationError(message=err_msg))
112 # validate that 'Entry-Definitions' metadata value points to an
113 # existing file in the CSAR
114 entry = self.metadata.get('Entry-Definitions')
115 if entry and entry not in filelist:
116 err_msg = (_('The "Entry-Definitions" file defined in the '
117 'CSAR "%s" does not exist.') % self.path)
118 ExceptionCollector.appendException(
119 ValidationError(message=err_msg))
122 # validate that external references in the main template actually
123 # exist and are accessible
124 self._validate_external_references()
125 return not self.error_caught
127 def get_metadata(self):
128 """Return the metadata dictionary."""
130 # validate the csar if not already validated
131 if not self.is_validated:
134 # return a copy to avoid changes overwrite the original
135 return dict(self.metadata) if self.metadata else None
137 def _get_metadata(self, key):
138 if not self.is_validated:
140 return self.metadata.get(key)
142 def get_author(self):
143 return self._get_metadata('Created-By')
145 def get_version(self):
146 return self._get_metadata('CSAR-Version')
148 def get_main_template(self):
149 entry_def = self._get_metadata('Entry-Definitions')
150 if entry_def in self.zfile.namelist():
153 def get_main_template_yaml(self):
154 main_template = self.get_main_template()
156 data = self.zfile.read(main_template)
157 invalid_tosca_yaml_err_msg = (
158 _('The file "%(template)s" in the CSAR "%(csar)s" does not '
159 'contain valid TOSCA YAML content.') %
160 {'template': main_template, 'csar': self.path})
162 tosca_yaml = yaml.load(data)
163 if type(tosca_yaml) is not dict:
164 ExceptionCollector.appendException(
165 ValidationError(message=invalid_tosca_yaml_err_msg))
168 ExceptionCollector.appendException(
169 ValidationError(message=invalid_tosca_yaml_err_msg))
171 def get_description(self):
172 desc = self._get_metadata('Description')
176 self.metadata['Description'] = \
177 self.get_main_template_yaml().get('description')
178 return self.metadata['Description']
180 def decompress(self):
181 if not self.is_validated:
183 # Although in python this works well - jython insists on getting a directory here, not a file:
184 #self.temp_dir = tempfile.NamedTemporaryFile().name
185 self.temp_dir = tempfile.mkdtemp()
187 with zipfile.ZipFile(self.csar, "r") as zf:
188 zf.extractall(self.temp_dir)
190 def _validate_external_references(self):
191 """Extracts files referenced in the main template
193 These references are currently supported:
195 * interface implementations
200 main_tpl_file = self.get_main_template()
201 if not main_tpl_file:
203 main_tpl = self.get_main_template_yaml()
205 if 'imports' in main_tpl:
206 ImportsLoader(main_tpl['imports'],
207 os.path.join(self.temp_dir, main_tpl_file))
209 if 'topology_template' in main_tpl:
210 topology_template = main_tpl['topology_template']
212 if 'node_templates' in topology_template:
213 node_templates = topology_template['node_templates']
215 for node_template_key in node_templates:
216 node_template = node_templates[node_template_key]
217 if 'artifacts' in node_template:
218 artifacts = node_template['artifacts']
219 for artifact_key in artifacts:
220 artifact = artifacts[artifact_key]
221 if isinstance(artifact, six.string_types):
222 self._validate_external_reference(
225 elif isinstance(artifact, dict):
226 if 'file' in artifact:
227 self._validate_external_reference(
231 ExceptionCollector.appendException(
232 ValueError(_('Unexpected artifact '
233 'definition for "%s".')
235 self.error_caught = True
236 if 'interfaces' in node_template:
237 interfaces = node_template['interfaces']
238 for interface_key in interfaces:
239 interface = interfaces[interface_key]
240 for opertation_key in interface:
241 operation = interface[opertation_key]
242 if isinstance(operation, six.string_types):
243 self._validate_external_reference(
247 elif isinstance(operation, dict):
248 if 'implementation' in operation:
249 self._validate_external_reference(
251 operation['implementation'])
254 shutil.rmtree(self.temp_dir, False, self._printPath)
257 def _printPath(self, func, path, exc_info):
258 print('Could not delete: ' + path)
261 def _validate_external_reference(self, tpl_file, resource_file,
263 """Verify that the external resource exists
265 If resource_file is a URL verify that the URL is valid.
266 If resource_file is a relative path verify that the path is valid
267 considering base folder (self.temp_dir) and tpl_file.
268 Note that in a CSAR resource_file cannot be an absolute path.
270 if UrlUtils.validate_url(resource_file):
271 msg = (_('The resource at "%s" cannot be accessed.') %
274 if UrlUtils.url_accessible(resource_file):
277 ExceptionCollector.appendException(
278 URLException(what=msg))
279 self.error_caught = True
281 ExceptionCollector.appendException(
282 URLException(what=msg))
283 self.error_caught = True
285 if os.path.isfile(os.path.join(self.temp_dir,
286 os.path.dirname(tpl_file),
291 ExceptionCollector.appendException(
292 ValueError(_('The resource "%s" does not exist.')
294 self.error_caught = True