from jsonschema import validate, ValidationError
import requests
-from dcae_cli.util import reraise_with_msg
+from dcae_cli.util import reraise_with_msg, fetch_file_from_web
from dcae_cli.util import config as cli_config
from dcae_cli.util.exc import DcaeException
from dcae_cli.util.logger import get_logger
# c) Both
#
-# TODO: Source this from app's configuration [ONAP URL TBD]
-_nexus_uri = "http://make-me-valid"
class FetchSchemaError(RuntimeError):
pass
-def _fetch_schema_from_web(server_uri, schema_path):
+def _fetch_schema(schema_path):
try:
- schema_url = "{0}/{1}".format(server_uri, schema_path)
- r = requests.get(schema_url)
- r.raise_for_status()
- return json.loads(r.text)
+ server_url = cli_config.get_server_url()
+ return fetch_file_from_web(server_url, schema_path)
except requests.HTTPError as e:
raise FetchSchemaError("HTTP error from fetching schema", e)
except Exception as e:
raise FetchSchemaError("Unexpected error from fetching schema", e)
-_fetch_schema_from_nexus = partial(_fetch_schema_from_web, _nexus_uri)
-
def _safe_dict(obj):
'''Returns a dict from a dict or json string'''
except FetchSchemaError as e:
reraise_with_msg(e, as_dcae=True)
-_validate_using_nexus = partial(_validate, _fetch_schema_from_nexus)
+_validate_using_nexus = partial(_validate, _fetch_schema)
_path_component_spec = cli_config.get_path_component_spec()
"""
# Apply health check defaults
healthcheck_type = config["healthcheck"]["type"]
- component_spec = _fetch_schema_from_nexus(_path_component_spec)
+ component_spec = _fetch_schema(_path_component_spec)
if healthcheck_type in ["http", "https"]:
apply_defaults_func = partial(apply_defaults,