1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 from aria.parser.validation import Issue
17 from aria.utils.collections import (deepcopy_with_locators, OrderedDict)
19 from .parameters import (convert_parameter_definitions_to_values, validate_required_values,
20 coerce_parameter_value)
21 from .interfaces import (convert_requirement_interface_definitions_from_type_to_raw_template,
22 merge_interface_definitions, merge_interface, validate_required_inputs)
29 def get_inherited_requirement_definitions(context, presentation):
31 Returns our requirement definitions added on top of those of our parent, if we have one
34 Allows overriding requirement definitions if they have the same name.
37 parent = presentation._get_parent(context)
38 requirement_definitions = get_inherited_requirement_definitions(context, parent) \
39 if parent is not None else []
41 our_requirement_definitions = presentation.requirements
42 if our_requirement_definitions:
43 for requirement_name, our_requirement_definition in our_requirement_definitions:
44 # Remove existing requirement definitions of this name if they exist
45 for name, requirement_definition in requirement_definitions:
46 if name == requirement_name:
47 requirement_definitions.remove((name, requirement_definition))
49 requirement_definitions.append((requirement_name, our_requirement_definition))
51 return requirement_definitions
58 def get_template_requirements(context, presentation):
60 Returns our requirements added on top of those of the node type if they exist there.
62 If the requirement has a relationship, the relationship properties and interfaces are assigned.
64 Returns the assigned property, interface input, and interface operation input values while
65 making sure they are defined in our type. Default values, if available, will be used if we did
66 not assign them. Also makes sure that required properties and inputs indeed end up with a value.
69 requirement_assignments = []
71 the_type = presentation._get_type(context) # NodeType
72 requirement_definitions = the_type._get_requirements(context) if the_type is not None else None
74 # Add our requirement assignments
75 our_requirement_assignments = presentation.requirements
76 if our_requirement_assignments:
77 add_requirement_assignments(context, presentation, requirement_assignments,
78 requirement_definitions, our_requirement_assignments)
80 # Validate occurrences
81 if requirement_definitions:
82 for requirement_name, requirement_definition in requirement_definitions:
84 allowed_occurrences = requirement_definition.occurrences
85 allowed_occurrences = allowed_occurrences if allowed_occurrences is not None else None
87 # Count actual occurrences
88 actual_occurrences = 0
89 for name, _ in requirement_assignments:
90 if name == requirement_name:
91 actual_occurrences += 1
93 if allowed_occurrences is None:
94 # If not specified, we interpret this to mean that exactly 1 occurrence is required
95 if actual_occurrences == 0:
96 # If it's not there, we will automatically add it (this behavior is not in the
97 # TOSCA spec, but seems implied)
98 requirement_assignment, \
99 relationship_property_definitions, \
100 relationship_interface_definitions = \
101 convert_requirement_from_definition_to_assignment(context,
102 requirement_definition,
104 validate_requirement_assignment(context, presentation, requirement_assignment,
105 relationship_property_definitions,
106 relationship_interface_definitions)
107 requirement_assignments.append((requirement_name, requirement_assignment))
108 elif actual_occurrences > 1:
109 context.validation.report(
110 'requirement "%s" is allowed only one occurrence in "%s": %d'
111 % (requirement_name, presentation._fullname, actual_occurrences),
112 locator=presentation._locator, level=Issue.BETWEEN_TYPES)
114 if not allowed_occurrences.is_in(actual_occurrences):
115 if allowed_occurrences.value[1] == 'UNBOUNDED':
116 context.validation.report(
117 'requirement "%s" does not have at least %d occurrences in "%s": has %d'
118 % (requirement_name, allowed_occurrences.value[0],
119 presentation._fullname, actual_occurrences),
120 locator=presentation._locator, level=Issue.BETWEEN_TYPES)
122 context.validation.report(
123 'requirement "%s" is allowed between %d and %d occurrences in "%s":'
125 % (requirement_name, allowed_occurrences.value[0],
126 allowed_occurrences.value[1], presentation._fullname,
128 locator=presentation._locator, level=Issue.BETWEEN_TYPES)
130 return requirement_assignments
137 def convert_requirement_from_definition_to_assignment(context, requirement_definition, # pylint: disable=too-many-branches
138 our_requirement_assignment, container):
139 from ..assignments import RequirementAssignment
143 # Capability type name:
144 raw['capability'] = deepcopy_with_locators(requirement_definition.capability)
146 node_type = requirement_definition._get_node_type(context)
147 if node_type is not None:
148 raw['node'] = deepcopy_with_locators(node_type._name)
150 relationship_type = None
151 relationship_template = None
152 relationship_property_definitions = None
153 relationship_interface_definitions = None
155 # First try to find the relationship if we declared it
156 # RelationshipAssignment:
157 our_relationship = our_requirement_assignment.relationship \
158 if our_requirement_assignment is not None else None
159 if our_relationship is not None:
160 relationship_type, relationship_type_variant = our_relationship._get_type(context)
161 if relationship_type_variant == 'relationship_template':
162 relationship_template = relationship_type
163 relationship_type = relationship_template._get_type(context)
165 definition_relationship_type = None
166 relationship_definition = requirement_definition.relationship # RelationshipDefinition
167 if relationship_definition is not None:
168 definition_relationship_type = relationship_definition._get_type(context)
170 # If not exists, try at the node type
171 if relationship_type is None:
172 relationship_type = definition_relationship_type
174 # Make sure the type is derived
175 if not definition_relationship_type._is_descendant(context, relationship_type):
176 context.validation.report(
177 'assigned relationship type "%s" is not a descendant of declared relationship type'
179 % (relationship_type._name, definition_relationship_type._name),
180 locator=container._locator, level=Issue.BETWEEN_TYPES)
182 if relationship_type is not None:
183 raw['relationship'] = OrderedDict()
185 type_name = our_relationship.type if our_relationship is not None else None
186 if type_name is None:
187 type_name = relationship_type._name
189 raw['relationship']['type'] = deepcopy_with_locators(type_name)
191 # These are our property definitions
192 relationship_property_definitions = relationship_type._get_properties(context)
194 if relationship_template is not None:
195 # Property values from template
196 raw['relationship']['properties'] = relationship_template._get_property_values(context)
198 if relationship_property_definitions:
199 # Convert property definitions to values
200 raw['relationship']['properties'] = \
201 convert_parameter_definitions_to_values(context,
202 relationship_property_definitions)
204 # These are our interface definitions
205 # InterfaceDefinition:
206 relationship_interface_definitions = OrderedDict(relationship_type._get_interfaces(context))
208 # Convert interface definitions to templates
209 convert_requirement_interface_definitions_from_type_to_raw_template(
212 relationship_interface_definitions)
214 if relationship_definition:
215 # Merge extra interface definitions
216 # InterfaceDefinition:
217 definition_interface_definitions = relationship_definition.interfaces
218 merge_interface_definitions(context, relationship_interface_definitions,
219 definition_interface_definitions, requirement_definition,
222 if relationship_template is not None:
223 # Interfaces from template
224 interfaces = relationship_template._get_interfaces(context)
226 raw['relationship']['interfaces'] = OrderedDict()
227 for interface_name, interface in interfaces.iteritems():
228 raw['relationship']['interfaces'][interface_name] = interface._raw
231 RequirementAssignment(name=requirement_definition._name, raw=raw, container=container), \
232 relationship_property_definitions, \
233 relationship_interface_definitions
236 def add_requirement_assignments(context, presentation, requirement_assignments,
237 requirement_definitions, our_requirement_assignments):
238 for requirement_name, our_requirement_assignment in our_requirement_assignments:
239 requirement_definition = get_first_requirement(requirement_definitions, requirement_name)
240 if requirement_definition is not None:
241 requirement_assignment, \
242 relationship_property_definitions, \
243 relationship_interface_definitions = \
244 convert_requirement_from_definition_to_assignment(context, requirement_definition,
245 our_requirement_assignment,
247 merge_requirement_assignment(context,
248 relationship_property_definitions,
249 relationship_interface_definitions,
250 requirement_assignment, our_requirement_assignment)
251 validate_requirement_assignment(context,
252 our_requirement_assignment.relationship \
253 or our_requirement_assignment,
254 requirement_assignment,
255 relationship_property_definitions,
256 relationship_interface_definitions)
257 requirement_assignments.append((requirement_name, requirement_assignment))
259 context.validation.report('requirement "%s" not declared at node type "%s" in "%s"'
260 % (requirement_name, presentation.type,
261 presentation._fullname),
262 locator=our_requirement_assignment._locator,
263 level=Issue.BETWEEN_TYPES)
266 def merge_requirement_assignment(context, relationship_property_definitions,
267 relationship_interface_definitions, requirement, our_requirement):
268 our_capability = our_requirement.capability
269 if our_capability is not None:
270 requirement._raw['capability'] = deepcopy_with_locators(our_capability)
272 our_node = our_requirement.node
273 if our_node is not None:
274 requirement._raw['node'] = deepcopy_with_locators(our_node)
276 our_node_filter = our_requirement.node_filter
277 if our_node_filter is not None:
278 requirement._raw['node_filter'] = deepcopy_with_locators(our_node_filter._raw)
280 our_relationship = our_requirement.relationship # RelationshipAssignment
281 if (our_relationship is not None) and (our_relationship.type is None):
282 # Make sure we have a dict
283 if 'relationship' not in requirement._raw:
284 requirement._raw['relationship'] = OrderedDict()
286 merge_requirement_assignment_relationship(context, our_relationship,
287 relationship_property_definitions,
288 relationship_interface_definitions,
289 requirement, our_relationship)
292 def merge_requirement_assignment_relationship(context, presentation, property_definitions,
293 interface_definitions, requirement, our_relationship):
294 our_relationship_properties = our_relationship._raw.get('properties')
295 if our_relationship_properties:
296 # Make sure we have a dict
297 if 'properties' not in requirement._raw['relationship']:
298 requirement._raw['relationship']['properties'] = OrderedDict()
300 # Merge our properties
301 for property_name, prop in our_relationship_properties.iteritems():
302 if property_name in property_definitions:
303 definition = property_definitions[property_name]
304 requirement._raw['relationship']['properties'][property_name] = \
305 coerce_parameter_value(context, presentation, definition, prop)
307 context.validation.report(
308 'relationship property "%s" not declared at definition of requirement "%s"'
310 % (property_name, requirement._fullname,
311 presentation._container._container._fullname),
312 locator=our_relationship._get_child_locator('properties', property_name),
313 level=Issue.BETWEEN_TYPES)
315 our_interfaces = our_relationship.interfaces
317 # Make sure we have a dict
318 if 'interfaces' not in requirement._raw['relationship']:
319 requirement._raw['relationship']['interfaces'] = OrderedDict()
322 for interface_name, our_interface in our_interfaces.iteritems():
323 if interface_name not in requirement._raw['relationship']['interfaces']:
324 requirement._raw['relationship']['interfaces'][interface_name] = OrderedDict()
326 if (interface_definitions is not None) and (interface_name in interface_definitions):
327 interface_definition = interface_definitions[interface_name]
328 interface_assignment = requirement.relationship.interfaces[interface_name]
329 merge_interface(context, presentation, interface_assignment, our_interface,
330 interface_definition, interface_name)
332 context.validation.report(
333 'relationship interface "%s" not declared at definition of requirement "%s"'
335 % (interface_name, requirement._fullname,
336 presentation._container._container._fullname),
337 locator=our_relationship._locator, level=Issue.BETWEEN_TYPES)
340 def validate_requirement_assignment(context, presentation, requirement_assignment,
341 relationship_property_definitions,
342 relationship_interface_definitions):
343 relationship = requirement_assignment.relationship
344 if relationship is None:
347 validate_required_values(context, presentation, relationship.properties,
348 relationship_property_definitions)
350 if relationship_interface_definitions:
351 for interface_name, relationship_interface_definition \
352 in relationship_interface_definitions.iteritems():
353 interface_assignment = relationship.interfaces.get(interface_name) \
354 if relationship.interfaces is not None else None
355 validate_required_inputs(context, presentation, interface_assignment,
356 relationship_interface_definition, None, interface_name)
359 def get_first_requirement(requirement_definitions, name):
360 if requirement_definitions is not None:
361 for requirement_name, requirement_definition in requirement_definitions:
362 if requirement_name == name:
363 return requirement_definition