1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 Functions for DMaaP integration
26 from jsonschema import validate, ValidationError
27 from dcae_cli.util import reraise_with_msg
28 from dcae_cli.util.logger import get_logger
29 from dcae_cli.catalog.mock.schema import apply_defaults
32 logger = get_logger('Dmaap')
35 "$schema": "http://json-schema.org/draft-04/schema#",
36 "title": "Schema for dmaap inputs",
39 { "$ref": "#/definitions/message_router" },
40 { "$ref": "#/definitions/data_router_publisher" },
41 { "$ref": "#/definitions/data_router_subscriber" }
49 "enum": ["message_router"]
81 "additionalProperties": False
88 "additionalProperties": False
90 "data_router_publisher": {
95 "enum": ["data_router"]
103 "description": "the DCAE location for the publisher, used to set up routing"
107 "description": "the URL to which the publisher makes Data Router publish requests"
112 "description": "the URL from which log data for the feed can be obtained"
117 "description": "the username the publisher uses to authenticate to Data Router"
122 "description": "the password the publisher uses to authenticate to Data Router"
132 "additionalProperties": False
139 "additionalProperties": False
141 "data_router_subscriber": {
146 "enum": ["data_router"]
154 "description": "the DCAE location for the publisher, used to set up routing"
158 "description": "the URL to which the Data Router should deliver files"
163 "description": "the username Data Router uses to authenticate to the subscriber when delivering files"
168 "description": "the username Data Router uses to authenticate to the subscriber when delivering file"
175 "additionalProperties": False
182 "additionalProperties": False
188 _validation_msg = """
189 Is your DMaaP client object a valid json?
190 Does your DMaaP client object follow this format?
195 "aaf_username": <string, optional>,
196 "aaf_password": <string, optional>,
197 "type": "message_router",
199 "client_role": <string, optional>,
200 "client_id": <string, optional>,
201 "location": <string, optional>,
202 "topic_url": <string, required>
206 Data router (publisher):
209 "type": "data_router",
211 "location": <string, optional>,
212 "publish_url": <string, required>,
213 "log_url": <string, optional>,
214 "username": <string, optional>,
215 "password": <string, optional>,
216 "publisher_id": <string, optional>
220 Data router (subscriber):
223 "type": "data_router",
225 "location": <string, optional>,
226 "delivery_url": <string, optional>,
227 "username": <string, optional>,
228 "password": <string, optional>,
229 "subscriber_id": <string, optional>
235 def validate_dmaap_map_schema(dmaap_map):
236 """Validate the dmaap map schema"""
237 for k, v in six.iteritems(dmaap_map):
240 except ValidationError as e:
241 logger.error("DMaaP validation issue with \"{k}\"".format(k=k))
242 logger.error(_validation_msg)
243 reraise_with_msg(e, as_dcae=True)
246 class DMaaPValidationError(RuntimeError):
249 def _find_matching_definition(instance):
250 """Find and return matching definition given an instance"""
251 for subsection in ["message_router", "data_router_publisher",
252 "data_router_subscriber"]:
254 validate(instance, _SCHEMA["definitions"][subsection])
255 return _SCHEMA["definitions"][subsection]
256 except ValidationError:
259 # You should never get here but just in case..
260 logger.error("No matching definition: {0}".format(instance))
261 raise DMaaPValidationError("No matching definition")
263 def apply_defaults_dmaap_map(dmaap_map):
264 """Apply the defaults to the dmaap map"""
265 def grab_properties(instance):
266 return _find_matching_definition(instance)["properties"]
268 return { k: apply_defaults(grab_properties(v), v) for k,v in
269 six.iteritems(dmaap_map) }
272 def validate_dmaap_map_entries(dmaap_map, mr_config_keys, dr_config_keys):
273 """Validate dmaap map entries
275 Validate dmaap map to make sure all config keys are there and that there's
276 no additional config keys beceause this map is used in generating the
281 True when dmaap_map is ok and False when it is not
283 # Catch when there is no dmaap_map when there should be
284 if len(mr_config_keys) + len(dr_config_keys) > 0 and len(dmaap_map) == 0:
285 logger.error("You have dmaap streams defined in your specification")
286 logger.error("You must provide a dmaap json to resolve those dmaap streams.")
287 logger.error("Please use the \"--dmaap-file\" option")
290 # Look for missing keys
291 is_missing = lambda config_key: config_key not in dmaap_map
292 missing_keys = list(filter(is_missing, mr_config_keys))
295 logger.error("Missing config keys in dmaap json: {0}".format(
296 ",".join(missing_keys)))
297 logger.error("Re-edit your dmaap json")
300 # Look for unexpected keys
301 is_unexpected = lambda config_key: config_key not in mr_config_keys
302 unexpected_keys = list(filter(is_unexpected, dmaap_map.keys()))
305 # NOTE: Changed this to a non-error in order to support the feature of
306 # developer having a master dmaap map
307 logger.warn("Unexpected config keys in dmaap json: {0}".format(
308 ",".join(unexpected_keys)))
314 def update_delivery_urls(get_route_func, target_base_url, dmaap_map):
315 """Update delivery urls for dmaap map
317 This method picks out all the data router connections for subscribers and
318 updates the delivery urls with the supplied base target url concatentated
319 with the user specified route (or path).
323 get_route_func (func): Function that takes a config_key and returns the route
324 used for the data router subscriber
325 target_base_url (string): "{http|https}://<hostname>:<port>"
326 dmaap_map (dict): DMaaP map is map of inputs that is config_key to provisioned
327 data router feed or message router topic connection details
331 Returns the updated DMaaP map
333 def update_delivery_url(config_key, dm):
334 route = get_route_func(config_key)
335 dm["dmaap_info"]["delivery_url"] = "{base}{tween}{path}".format(base=target_base_url,
336 path=route, tween="" if route[0] == "/" else "/")
339 def is_dr_subscriber(dm):
340 return dm["type"] == "data_router" and "publish_url" not in dm["dmaap_info"]
342 updated_map = { config_key: update_delivery_url(config_key, dm)
343 for config_key, dm in six.iteritems(dmaap_map) if is_dr_subscriber(dm) }
344 dmaap_map.update(updated_map)
349 def list_delivery_urls(dmaap_map):
350 """List delivery urls
354 List of tuples (config_key, deliery_url)
356 return [(config_key, dm["dmaap_info"]["delivery_url"]) \
357 for config_key, dm in six.iteritems(dmaap_map) if "delivery_url" in dm["dmaap_info"]]