The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
+## [1.1.1] - 2022/08/05
+ * DCAEGEN2-3170 - CodeCoverage improvement for dcaegen2-platform-mod-distributorapi
+ * fix value of MarkupSafe to 2.0.1 because jango+jinja uses a slightly older API
+ * run black on mod/distributorapi
+
+## [1.1.0] - 2020/08/19
+ * DCAEGEN2-2292 - ONAP must complete update of the Python language (from 2.7 -> 3.8)
+
## [1.0.1] - 2020/3/26
* Run as non-root
+
## [1.0.0] - 2019/11/11
* Distributor API - initial version
# ============LICENSE_START=======================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
except KeyError:
raise errors.DistributorAPIConfigError("Required environment variable missing: {0}".format(name))
+
def init():
global nifi_registry_url
- nifi_registry_url = _grab_env("NIFI_REGISTRY_URL"
- , default="http://nifi-registry:18080/nifi-registry-api")
+ nifi_registry_url = _grab_env("NIFI_REGISTRY_URL", default="http://nifi-registry:18080/nifi-registry-api")
global onboarding_api_url
- onboarding_api_url = _grab_env("ONBOARDING_API_URL"
- , default="http://onboarding-api:8080/onboarding")
+ onboarding_api_url = _grab_env("ONBOARDING_API_URL", default="http://onboarding-api:8080/onboarding")
# ============LICENSE_START=======================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
def get_distribution_target(ds_id):
global _cache
- result = [ i for i in _cache if i["dt_id"] == ds_id ]
+ result = [i for i in _cache if i["dt_id"] == ds_id]
return result[0] if result else {}
+
def transform_request(req):
"""Transform request to object to store
req["processGroups"] = []
return req
+
def add_distribution_target(dt):
global _cache
_cache.append(dt)
dt["modified"] = datetime.utcnow().isoformat()
return dt
+
def update_distribution_target(updated_dt):
dt_id = updated_dt["dt_id"]
global _cache
dt["processGroups"].append(process_group)
return process_group
return None
-
-
# ============LICENSE_START=======================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# ============LICENSE_END=========================================================
"""Errors"""
+
class DistributorAPIError(RuntimeError):
pass
+
class DistributorAPIConfigError(DistributorAPIError):
pass
+
class DistributorAPIResourceNotFound(DistributorAPIError):
pass
# ============LICENSE_START=======================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
CORS(_app)
# Try to bundle as many errors together
# https://flask-restplus.readthedocs.io/en/stable/parsing.html#error-handling
-_app.config['BUNDLE_ERRORS'] = True
-_api = Api(_app, version=__version__, title="Distributor HTTP API",
- description="HTTP API to manage distribution targets for DCAE design. Distribution targets are DCAE runtime environments that have been registered and are enabled to accept flow design changes that are to be orchestrated in that environment",
- contact="", default_mediatype="application/json"
- , prefix="/distributor", doc="/distributor", default="distributor"
- )
+_app.config["BUNDLE_ERRORS"] = True
+_api = Api(
+ _app,
+ version=__version__,
+ title="Distributor HTTP API",
+ description="HTTP API to manage distribution targets for DCAE design. Distribution targets are DCAE runtime environments that have been registered and are enabled to accept flow design changes that are to be orchestrated in that environment",
+ contact="",
+ default_mediatype="application/json",
+ prefix="/distributor",
+ doc="/distributor",
+ default="distributor",
+)
# REVIEW: Do I need a namespace?
ns = _api
-model_pg = _api.model("ProcessGroup", {
- "id": fields.String(required=True, description="Id for this process group"
- , attribute="processGroupId")
- , "version": fields.Integer(required=True
- , description="Version of the process group")
- , "processed": fields.DateTime(required=True
- , description="When this process group was processed by this API")
- , "runtimeResponse": fields.String(required=True
- , description="Full response from the runtime API")
- })
-
-model_dt = _api.model("DistributionTarget", {
- "selfUrl": fields.Url("resource_distribution_target", absolute=True)
- , "id": fields.String(required=True, description="Id for this distribution target"
- , attribute="dt_id")
- , "name": fields.String(required=True, description="Name for this distribution target"
- , attribute="name")
- , "runtimeApiUrl": fields.String(required=True
- , description="Url to the runtime API for this distribution target"
- , attribute="runtimeApiUrl")
- , "description": fields.String(required=False
- , description="Description for this distribution target"
- , attribute="description")
- , "nextDistributionTargetId": fields.String(required=False
- , description="Id to the next distribution target. Distribution targets can be linked together and have a progression order. Specifying the id of the next distribution target defines the next element int the order."
- , attribute="nextDistributionTargetId")
- , "created": fields.String(required=True
- , description="When this distribution target was created in UTC"
- , attribute="created")
- , "modified": fields.String(required=True
- , description="When this distribution target was last modified in UTC"
- , attribute="modified")
- , "processGroups": fields.List(fields.Nested(model_pg))
- })
-
-model_dts = _api.model("DistributionTargets", {
- "distributionTargets": fields.List(fields.Nested(model_dt))
- })
+model_pg = _api.model(
+ "ProcessGroup",
+ {
+ "id": fields.String(required=True, description="Id for this process group", attribute="processGroupId"),
+ "version": fields.Integer(required=True, description="Version of the process group"),
+ "processed": fields.DateTime(required=True, description="When this process group was processed by this API"),
+ "runtimeResponse": fields.String(required=True, description="Full response from the runtime API"),
+ },
+)
+
+model_dt = _api.model(
+ "DistributionTarget",
+ {
+ "selfUrl": fields.Url("resource_distribution_target", absolute=True),
+ "id": fields.String(required=True, description="Id for this distribution target", attribute="dt_id"),
+ "name": fields.String(required=True, description="Name for this distribution target", attribute="name"),
+ "runtimeApiUrl": fields.String(
+ required=True, description="Url to the runtime API for this distribution target", attribute="runtimeApiUrl"
+ ),
+ "description": fields.String(
+ required=False, description="Description for this distribution target", attribute="description"
+ ),
+ "nextDistributionTargetId": fields.String(
+ required=False,
+ description="Id to the next distribution target. Distribution targets can be linked together and have a progression order. Specifying the id of the next distribution target defines the next element int the order.",
+ attribute="nextDistributionTargetId",
+ ),
+ "created": fields.String(
+ required=True, description="When this distribution target was created in UTC", attribute="created"
+ ),
+ "modified": fields.String(
+ required=True, description="When this distribution target was last modified in UTC", attribute="modified"
+ ),
+ "processGroups": fields.List(fields.Nested(model_pg)),
+ },
+)
+
+model_dts = _api.model("DistributionTargets", {"distributionTargets": fields.List(fields.Nested(model_dt))})
parser_dt_req = ns.parser()
-parser_dt_req.add_argument("name", required=True, trim=True,
- location="json", help="Name for this new distribution target")
-parser_dt_req.add_argument("runtimeApiUrl", required=True, trim=True,
- location="json", help="Url to the runtime API for this distribution target")
-parser_dt_req.add_argument("description", required=False, trim=True,
- location="json", help="Description for this distribution target")
-parser_dt_req.add_argument("nextDistributionTargetId", required=False, trim=True,
- location="json", help="Id of the next distribution target. Distribution targets can be linked together and have a progression order. Specifying the id of the next distribution target defines the next element int the order.")
+parser_dt_req.add_argument(
+ "name", required=True, trim=True, location="json", help="Name for this new distribution target"
+)
+parser_dt_req.add_argument(
+ "runtimeApiUrl",
+ required=True,
+ trim=True,
+ location="json",
+ help="Url to the runtime API for this distribution target",
+)
+parser_dt_req.add_argument(
+ "description", required=False, trim=True, location="json", help="Description for this distribution target"
+)
+parser_dt_req.add_argument(
+ "nextDistributionTargetId",
+ required=False,
+ trim=True,
+ location="json",
+ help="Id of the next distribution target. Distribution targets can be linked together and have a progression order. Specifying the id of the next distribution target defines the next element int the order.",
+)
@ns.route("/distribution-targets", endpoint="resource_distribution_targets")
@ns.doc("get_distribution_targets", description="List distribution targets")
@ns.marshal_with(model_dts)
def get(self):
- return { "distributionTargets": da.get_distribution_targets() }, 200
+ return {"distributionTargets": da.get_distribution_targets()}, 200
@ns.doc("post_distribution_targets", description="Create a new distribution target")
@ns.expect(parser_dt_req)
resp = da.add_distribution_target(req)
return resp, 200
+
@ns.route("/distribution-targets/<string:dt_id>", endpoint="resource_distribution_target")
class DistributionTarget(Resource):
@ns.doc("get_distribution_target", description="Get a distribution target instance")
- @ns.response(404, 'Distribution target not found')
- @ns.response(500, 'Internal Server Error')
+ @ns.response(404, "Distribution target not found")
+ @ns.response(500, "Internal Server Error")
@ns.marshal_with(model_dt)
def get(self, dt_id):
result = da.get_distribution_target(dt_id)
frp.abort(code=404, message="Unknown distribution target")
@ns.doc("put_distribution_target", description="Update an existing distribution target")
- @ns.response(404, 'Distribution target not found')
- @ns.response(500, 'Internal Server Error')
+ @ns.response(404, "Distribution target not found")
+ @ns.response(500, "Internal Server Error")
@ns.expect(parser_dt_req)
@ns.marshal_with(model_dt)
def put(self, dt_id):
else:
frp.abort(code=500, message="Problem with storing the update")
- @ns.response(404, 'Distribution target not found')
- @ns.response(500, 'Internal Server Error')
+ @ns.response(404, "Distribution target not found")
+ @ns.response(500, "Internal Server Error")
@ns.doc("delete_distribution_target", description="Delete an existing distribution target")
def delete(self, dt_id):
if da.delete_distribution_target(dt_id):
parser_post_process_group = ns.parser()
-parser_post_process_group.add_argument("processGroupId", required=True,
- trim=True, location="json", help="Process group ID that exists in Nifi")
+parser_post_process_group.add_argument(
+ "processGroupId", required=True, trim=True, location="json", help="Process group ID that exists in Nifi"
+)
+
@ns.route("/distribution-targets/<string:dt_id>/process-groups", endpoint="resource_target_process_groups")
class DTargetProcessGroups(Resource):
-
- @ns.response(404, 'Distribution target not found')
- @ns.response(501, 'Feature is not supported right now')
- @ns.response(500, 'Internal Server Error')
+ @ns.response(404, "Distribution target not found")
+ @ns.response(501, "Feature is not supported right now")
+ @ns.response(500, "Internal Server Error")
@ns.expect(parser_post_process_group)
def post(self, dt_id):
# TODO: Need bucket ID but for now will simply scan through all buckets
# Make sure graph is setup in runtime api
if runc.ensure_graph(runtime_url, pg_id, pg_name) == False:
- frp.abort(code=501 , message="Runtime API: Graph could not be created")
+ frp.abort(code=501, message="Runtime API: Graph could not be created")
# Graph diffing using Nifi registry
def is_debug():
import os
+
if os.environ.get("DISTRIBUTOR_DEBUG", "1") == "1":
return True
else:
# ============LICENSE_START=======================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
def get_components_indexed(onboarding_url, list_name_version):
- return dict([
- ((c[0], c[1]), get_component(onboarding_url, c[0], c[1]))
- for c in list_name_version])
+ return dict([((c[0], c[1]), get_component(onboarding_url, c[0], c[1])) for c in list_name_version])
# ============LICENSE_START=======================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
for k, v in obj.items():
if k == "link":
- result["selfUrl"] =_urljoin(registry_url, v["href"])
+ result["selfUrl"] = _urljoin(registry_url, v["href"])
result[k] = v
elif type(v) == dict:
result[k] = _add_url_from_link(registry_url, v)
"""Returns list of versions from greatest to least for a given flow"""
versions_url = _urljoin(flow_url, "versions")
# List of versions will be greatest to least
- return list(reversed(sorted(
- [v["version"] for v in _get_json(versions_url)])))
+ return list(reversed(sorted([v["version"] for v in _get_json(versions_url)])))
+
def get_flow_diff(registry_url, flow_url, version_one, version_two):
diff_url = _urljoin(flow_url, "diff", str(version_one), str(version_two))
return _get_json(diff_url)
+
def get_flow_diff_latest(registry_url, flow_url):
versions = get_flow_versions(flow_url)
return None
else:
# Example in gitlab wiki shows that lower version is first
- return _add_url_from_link(registry_url
- , get_flow_diff(registry_url, flow_url, versions[1], versions[0]))
+ return _add_url_from_link(registry_url, get_flow_diff(registry_url, flow_url, versions[1], versions[0]))
+
def get_flow_version(registry_url, flow_url, version):
version_url = _urljoin(flow_url, "versions", str(version))
return _add_url_from_link(registry_url, _get_json(version_url))
+
def get_flow_version_latest(registry_url, flow_url):
return get_flow_version(registry_url, flow_url, "latest")
# ============LICENSE_START=======================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
def create_graph(runtime_url, graph_id, graph_name):
url = urljoin(runtime_url, "api/graph/main")
- resp = reqs.post(url, json={"name": graph_name, "id": graph_id
- , "description": "", "main": True})
+ resp = reqs.post(url, json={"name": graph_name, "id": graph_id, "description": "", "main": True})
try:
resp.raise_for_status()
except Exception as e:
with open("runtime-request-failed.json", "w+") as f:
import json
+
json.dump(graph_request, f)
raise errors.DistributorAPIError(e)
# ============LICENSE_START=======================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
and create a list of tuples where each tuple is
(component name, component version)"""
extract = lambda p: (p["bundle"]["artifact"], p["bundle"]["version"])
- return [ extract(p) for p in flow["flowContents"]["processors"] ]
+ return [extract(p) for p in flow["flowContents"]["processors"]]
def get_component(flow, components, processor_id):
bundle = p["bundle"]
return components.get((bundle["artifact"], bundle["version"]), None)
- cs = [get_component(p) for p in flow["flowContents"]["processors"] \
- if p["identifier"] == processor_id]
+ cs = [get_component(p) for p in flow["flowContents"]["processors"] if p["identifier"] == processor_id]
return cs[0] if cs else None
if rels_pubs:
_, _, _, transport_type, config_key = rels_pubs[0].split(":")
- src = { "node": comp["id"], "port": config_key }
+ src = {"node": comp["id"], "port": config_key}
else:
# REVIEW: This should be an error?
- src = { "node": comp["id"], "port": None }
+ src = {"node": comp["id"], "port": None}
else:
src = {}
if rels_subs:
_, _, _, transport_type, config_key = rels_subs[0].split(":")
- tgt = { "node": comp["id"], "port": config_key }
+ tgt = {"node": comp["id"], "port": config_key}
else:
# REVIEW: This should be an error?
- tgt = { "node": comp["id"], "port": None }
+ tgt = {"node": comp["id"], "port": None}
else:
tgt = {}
- return { "command": "addedge"
- , "payload": {
- "src": src
- , "tgt": tgt
- , "metadata": {
- "name": conn["name"]
- # TODO: Question these hardcoded attributes
- , "data_type": "json"
- , "dmaap_type": "MR"
- }
- }
- }
+ return {
+ "command": "addedge",
+ "payload": {
+ "src": src,
+ "tgt": tgt,
+ "metadata": {
+ "name": conn["name"]
+ # TODO: Question these hardcoded attributes
+ ,
+ "data_type": "json",
+ "dmaap_type": "MR",
+ },
+ },
+ }
def parse_processor(p):
c = components[(p["bundle"]["artifact"], p["bundle"]["version"])]
- return { "command": "addnode"
- # TODO: spec is required to be a json string but runtime api
- # changing this soon hopefully
- , "payload": { "component_spec": json.dumps(c["spec"])
- , "component_id": c["id"]
- , "name": c["name"]
- , "processor": p }
- }
-
- ps = [ parse_processor(p) for p in flow["flowContents"]["processors"] ]
- cs = [ parse_connection(c) for c in flow["flowContents"]["connections"] ]
- return ps+cs
+ return {
+ "command": "addnode"
+ # TODO: spec is required to be a json string but runtime api
+ # changing this soon hopefully
+ ,
+ "payload": {
+ "component_spec": json.dumps(c["spec"]),
+ "component_id": c["id"],
+ "name": c["name"],
+ "processor": p,
+ },
+ }
+ ps = [parse_processor(p) for p in flow["flowContents"]["processors"]]
+ cs = [parse_connection(c) for c in flow["flowContents"]["connections"]]
+ return ps + cs
# ============LICENSE_START=======================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
url = "/".join(full)
if query_params:
- qp = ["{0}={1}".format(quote(k), quote(str(v))) for k,v in query_params.items()]
+ qp = ["{0}={1}".format(quote(k), quote(str(v))) for k, v in query_params.items()]
qp = "&".join(qp)
return "?".join([url, qp])
else:
# ============LICENSE_START=======================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
-__version__ = "1.0.1"
+__version__ = "1.1.1"
============LICENSE_START=======================================================
org.onap.dcae
================================================================================
-Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
Copyright 2020 Deutsche Telekom. All rights reserved.
================================================================================
Licensed under the Apache License, Version 2.0 (the "License");
============LICENSE_END=========================================================
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>org.onap.dcaegen2.platform.mod</groupId>
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.onap.dcaegen2.platform.mod</groupId>
<artifactId>dcaegen2-platform-mod-distributorapi</artifactId>
<!-- NOTE: Must keep this version synchronized with the version in distributor/version.py file -->
- <version>1.1.0-SNAPSHOT</version>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <sonar.sources>.</sonar.sources>
- <sonar.junit.reportsPath>xunit-results.xml</sonar.junit.reportsPath>
- <sonar.python.coverage.reportPaths>coverage.xml</sonar.python.coverage.reportPaths>
- <sonar.language>py</sonar.language>
- <sonar.pluginname>python</sonar.pluginname>
- <sonar.inclusions>**/*.py</sonar.inclusions>
- <sonar.exclusions>**/tests/**,**/setup.py</sonar.exclusions>
- </properties>
+ <version>1.1.1-SNAPSHOT</version>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <sonar.sources>.</sonar.sources>
+ <sonar.junit.reportsPath>xunit-results.xml</sonar.junit.reportsPath>
+ <sonar.python.coverage.reportPaths>coverage.xml</sonar.python.coverage.reportPaths>
+ <sonar.language>py</sonar.language>
+ <sonar.pluginname>python</sonar.pluginname>
+ <sonar.inclusions>**/*.py</sonar.inclusions>
+ <sonar.exclusions>**/tests/**,**/setup.py</sonar.exclusions>
+ </properties>
</project>
# ============LICENSE_START=======================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# extract __version__ from version file. importing distributor will lead to install failures
setup_dir = os.path.dirname(__file__)
-with open(os.path.join(setup_dir, 'distributor', 'version.py')) as file:
+with open(os.path.join(setup_dir, "distributor", "version.py")) as file:
globals_dict = dict()
exec(file.read(), globals_dict)
- __version__ = globals_dict['__version__']
+ __version__ = globals_dict["__version__"]
setup(
- name = "distributor-api",
- version = __version__,
- packages = find_packages(),
- author = "Michael Hwang",
- description = ("API that manages distribution targets"),
- entry_points="""
+ name="distributor-api",
+ version=__version__,
+ packages=find_packages(),
+ author="Michael Hwang",
+ description=("API that manages distribution targets"),
+ entry_points="""
[console_scripts]
start-distributor-api=distributor.http:start_http_server
""",
- install_requires=[
- "Werkzeug==0.16.1",
- "flask-restplus"
- , "Flask-Cors"
- , "requests"
- ],
- zip_safe = False
- )
+ install_requires=["Werkzeug==0.16.1", "flask-restplus", "Flask-Cors", "requests", "MarkupSafe==2.0.1"],
+ zip_safe=False,
+)
# ============LICENSE_START=======================================================
-# Copyright (c) 2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# limitations under the License.
# ============LICENSE_END=========================================================
-from distributor.http import _app as app
-from distributor import config
+"""
+Tests that require a mock requests module, plus a few more
+that didn't fit cleanly elsewhere.
+"""
+
+import copy
+import os
+import re
import pytest
import requests
+from distributor.http import _app as app
+from distributor import config
+from distributor import onboarding_client
+from distributor import utils
+from distributor import errors
+from distributor import data_access
+from distributor import transform
+
class _resp(object):
- def __init__(self, code, json = None):
+ def __init__(self, code, json=None):
self.status_code = code
if json is not None:
self._json = json
-
+
def json(self):
return self._json
def raise_for_status(self):
if self.status_code < 200 or self.status_code >= 300:
- raise Exception('Error response {}'.format(self.status_code))
+ raise Exception("Error response {}".format(self.status_code))
+
class _req(object):
+ # in the test code, you can set
+ # _req.SHOWMATCHES = True
+ # and the match results will be displayed
+ SHOWMATCHES = False
+
def __init__(self, op, url, resp):
self.op = op
- self.url = url;
+ self.url = url
self.resp = resp
def check(self, op, url):
- if op != self.op or url != self.url:
- return None
- return self.resp
+ if _req.SHOWMATCHES:
+ print(f"_req.check(op={op} vs {self.op}, url={url} vs {self.url})")
+ return self.resp if op == self.op and url == self.url else None
+
def _match(answers, op, url):
for choice in answers:
ret = choice.check(op, url)
if ret is not None:
return ret
- message = 'Unexpected request {} {}'.format(op, url)
+ message = "Unexpected request {} {}".format(op, url)
print(message)
raise Exception(message)
+
@pytest.fixture
def mockrequests(monkeypatch):
answers = []
- def get(url, headers = None):
- return _match(answers, 'GET', url)
-
- def post(url, json, headers = None):
- return _match(answers, 'POST', url)
-
- def put(url, json, headers = None):
- return _match(answers, 'PUT', url)
-
- def delete(url, headers = None):
- return _match(answers, 'DELETE', url)
-
- monkeypatch.setattr(requests, 'get', get)
- monkeypatch.setattr(requests, 'post', post)
- monkeypatch.setattr(requests, 'put', put)
- monkeypatch.setattr(requests, 'delete', delete)
+
+ def get(url, headers=None):
+ return _match(answers, "GET", url)
+
+ def post(url, json, headers=None):
+ return _match(answers, "POST", url)
+
+ def put(url, json, headers=None):
+ return _match(answers, "PUT", url)
+
+ def delete(url, headers=None):
+ return _match(answers, "DELETE", url)
+
+ monkeypatch.setattr(requests, "get", get)
+ monkeypatch.setattr(requests, "post", post)
+ monkeypatch.setattr(requests, "put", put)
+ monkeypatch.setattr(requests, "delete", delete)
return answers
+
@pytest.fixture
def client():
- app.config['TESTING'] = True
+ app.config["TESTING"] = True
with app.test_client() as client:
yield client
+
+def isdate(dt):
+ """verify that a string looks like an iso8901 date/time string YYYY-MM-DDTHH:MM:SS.MS"""
+ return re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}[.]\d+$", dt)
+
+
+def isuuid(gu):
+ """verify that a string looks like a guid"""
+ return re.match(r"[a-zA-Z0-9]{8}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{12}$", gu)
+
+
config.init()
+
def test_api(client, mockrequests):
- dummyflow = {'link': {'href': 'buckets/link1/flows/flow1'}, 'name': 'flowname'}
- mockrequests.extend([
- _req('GET', 'http://nifi-registry:18080/nifi-registry-api/buckets',
- _resp(200, [{'link': {'href':'buckets/link1'}}])),
- _req('GET', 'http://nifi-registry:18080/nifi-registry-api/buckets/link1/flows',
- _resp(200, [dummyflow])),
- _req('POST', 'http://newtarget1/url/api/graph/main',
- _resp(200, {'id':'group1'}))
- ])
+ env_name = "TEST_API_GRAB_ENVIRON"
+ os.environ[env_name] = "xyz"
+ assert config._grab_env(env_name, "foo") == "xyz"
+ assert config._grab_env(env_name) == "xyz"
+ del os.environ[env_name]
+ assert config._grab_env(env_name, "foo") == "foo"
+ try:
+ config._grab_env(env_name)
+ assert not "config._grab_env(env_name) should throw errors.DistributorAPIConfigError"
+ except errors.DistributorAPIConfigError as e:
+ # expected result
+ pass
+
+ dummyflow = {"link": {"href": "buckets/link1/flows/flow1"}, "name": "flowname"}
+
+ nifi_url = "http://nifi-registry:18080/nifi-registry-api"
+ mockrequests.extend(
+ [
+ _req("GET", nifi_url + "/buckets", _resp(200, [{"link": {"href": "buckets/link1"}}])),
+ _req("GET", nifi_url + "/buckets/link1/flows", _resp(200, [dummyflow])),
+ _req("POST", "http://newtarget1/url/api/graph/main", _resp(200, {"id": "group1"})),
+ _req("GET", "/does/not/exist", _resp(404, [{"link": {"href": "does/not/exist"}}])),
+ _req(
+ "GET",
+ "/distributor/distribution-targets/components?name=foo&version=bar",
+ _resp(200, {"id": "groupd", "components": [{"componentUrl": "COMPONENTURL"}]}),
+ ),
+ _req("GET", "COMPONENTURL", _resp(200, {"id": "groupComponentUrl"})),
+ _req(
+ "GET",
+ "/distributor/distribution-targets/components?name=foo&version=bar2",
+ _resp(200, {"id": "groupd", "components": None}),
+ ),
+ ]
+ )
for rule in app.url_map.iter_rules():
- print(rule)
- url = '/distributor/distribution-targets'
- url2 = url + '/notfound'
- url3 = url2 + '/process-groups'
- assert(len(client.get(url).get_json()['distributionTargets']) == 0)
- assert(client.get(url2).status_code == 404)
- assert(client.put(url2, json={'name': 'notfound1', 'runtimeApiUrl': 'http://notfound/url'}).status_code == 404)
- assert(client.delete(url2).status_code == 404)
- assert(client.post(url3, json={'processGroupId': 'group1'}).status_code == 404)
- resp = client.post(url, json={'name': 'target1', 'runtimeApiUrl': 'http://target/url'})
- assert(resp.status_code == 200)
- print(resp.get_json())
- url2 = '/distributor/distribution-targets/' + resp.get_json()['id']
- url3 = url2 + '/process-groups'
- assert(len(client.get(url).get_json()['distributionTargets']) == 1)
- assert(client.get(url2).status_code == 200)
- assert(client.put(url2, json={'name': 'newtarget1', 'runtimeApiUrl': 'http://newtarget1/url'}).status_code == 200)
- assert(client.post(url3, json={'processGroupId': 'group1'}).status_code == 404)
- dummyflow['identifier'] = 'group1'
- assert(client.post(url3, json={'processGroupId': 'group1'}).status_code == 501)
- assert(client.delete(url2).status_code == 200)
- assert(client.delete(url2).status_code == 404)
+ print(rule)
+ url = "/distributor/distribution-targets"
+ url2 = url + "/notfound"
+ url3 = url2 + "/process-groups"
+ assert len(client.get(url).get_json()["distributionTargets"]) == 0
+ assert client.get(url2).status_code == 404
+ assert client.put(url2, json={"name": "notfound1", "runtimeApiUrl": "http://notfound/url"}).status_code == 404
+ assert client.delete(url2).status_code == 404
+ assert client.post(url3, json={"processGroupId": "group1"}).status_code == 404
+ resp = client.post(url, json={"name": "target1", "runtimeApiUrl": "http://target/url"})
+ assert resp.status_code == 200
+
+ # print(resp.get_json())
+ url2 = "/distributor/distribution-targets/" + resp.get_json()["id"]
+ url3 = url2 + "/process-groups"
+ assert len(client.get(url).get_json()["distributionTargets"]) == 1
+
+ assert client.get(url2).status_code == 200
+ assert client.put(url2, json={"name": "newtarget1", "runtimeApiUrl": "http://newtarget1/url"}).status_code == 200
+ assert client.put(url2, json={"name": "newtarget1", "runtimeApiUrl": "http://newtarget1/url"}).status_code == 200
+
+ assert client.post(url3, json={"processGroupId": "group1"}).status_code == 404
+ assert client.post(url3, json={"processGroupId": "group1"}).status_code == 404
+ dummyflow["identifier"] = "group1"
+ assert client.post(url3, json={"processGroupId": "group1"}).status_code == 501
+
+ assert client.delete(url2).status_code == 200
+ assert client.delete(url2).status_code == 404
+ url4 = "/does/not/exist"
+
+ # the following tests do not require an http client but do use requests lib
+
+ # test get_json() exception case
+ try:
+ utils.get_json(url4)
+ assert not "utils.get_json(url4) should throw errors.DistributorAPIError"
+ except errors.DistributorAPIError as e:
+ # expected result
+ pass
+
+ # _req.SHOWMATCHES = True
+ ret = onboarding_client.get_components_indexed(url, [("foo", "bar")])
+ assert ret == {("foo", "bar"): {"id": "groupComponentUrl"}}
+
+ #
+ try:
+ ret = onboarding_client.get_components_indexed(url, [("foo", "bar2")])
+ assert (
+ not "onboarding_client.get_components_indexed(...foo,bar2) should throw errors.DistributorAPIResourceNotFound"
+ )
+ except errors.DistributorAPIResourceNotFound as e:
+ # expected result
+ pass
+
+
+def test_data_access():
+ # various tests for data_access.py
+
+ saved_cache = copy.deepcopy(data_access.get_distribution_targets())
+ ret = data_access.get_distribution_target("ds")
+ assert ret == {}
+
+ # new transform_request()
+ req1 = {"name": "req1", "runtimeApiUrl": "rtau1", "nextDistributionTargetId": "ndti1"}
+ treq1 = data_access.transform_request(req1)
+ assert isdate(treq1["created"])
+ assert isdate(treq1["modified"])
+ assert isuuid(treq1["dt_id"])
+ assert treq1["processGroups"] == []
+
+ # new transform_request()
+ req2 = {"name": "req2", "runtimeApiUrl": "rtau2", "nextDistributionTargetId": "ndti1"}
+ treq2 = data_access.transform_request(req2)
+ assert isdate(treq2["created"])
+ assert isdate(treq2["modified"])
+ assert isuuid(treq2["dt_id"])
+ assert treq2["processGroups"] == []
+
+ # merge_request() should copy certain values from 2nd arg into 1st arg
+ ret = data_access.merge_request(treq1, treq2)
+ assert ret["name"] == treq2["name"]
+ assert ret["runtimeApiUrl"] == treq2["runtimeApiUrl"]
+ assert ret["description"] is None
+ assert ret["nextDistributionTargetId"] == treq2["nextDistributionTargetId"]
+
+ # add_distribution_target() adds to the cache
+ ret = data_access.add_distribution_target({"dt_id": "dt1", "val": "1", "processGroups": []})
+ assert data_access.get_distribution_target("dt1")["val"] == "1"
+
+ # update_distribution_target() updates an existing element of the cache
+ # If the element exists, it returns True
+ ret = data_access.update_distribution_target({"dt_id": "dt1", "val": "1b", "processGroups": []})
+ assert ret
+ assert data_access.get_distribution_target("dt1")["val"] == "1b"
+
+ # update_distribution_target() updates an existing element of the cache
+ # If the element does not exist, it returns False
+ ret = data_access.update_distribution_target({"dt_id": "dt2", "val": "2", "processGroups": []})
+ assert not ret
+
+ # add_process_group adds an element to the processGroups array of the distribution target
+ # if the element exists, returns true, else false
+ assert data_access.add_process_group("dt1", {"processed": "p1"})
+ assert isdate(data_access.get_distribution_target("dt1")["processGroups"][0]["processed"])
+ assert not data_access.add_process_group("dt2", {"processed": "p1"})
+
+ # clean up the cache
+ # if the element exists,
+ assert data_access.delete_distribution_target("dt1")
+ assert not data_access.delete_distribution_target("dt2")
+
+ assert data_access.get_distribution_targets() == saved_cache
+
+
+def test_transform():
+ # various tests for transform.py
+ flow1 = {"flowContents": {"processors": []}}
+ flow2 = {
+ "flowContents": {
+ "processors": [
+ {
+ "bundle": {"artifact": "artifact1", "version": "version1"},
+ }
+ ]
+ }
+ }
+ flow3 = {
+ "flowContents": {
+ "processors": [
+ {
+ "bundle": {"artifact": "artifact1", "version": "version1"},
+ },
+ {"bundle": {"artifact": "artifact2", "version": "version2"}},
+ ]
+ }
+ }
+ assert transform.extract_components_from_flow(flow1) == []
+ assert transform.extract_components_from_flow(flow2) == [("artifact1", "version1")]
+ assert transform.extract_components_from_flow(flow3) == [("artifact1", "version1"), ("artifact2", "version2")]
# ============LICENSE_START=======================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
def test_add_url_from_link():
- test = {"link": {"href": "bar"}, "name": "jane", "age": 33,
- "innerTest": {"link": {"href": "baz"}, "name": "bob"}
- }
+ test = {"link": {"href": "bar"}, "name": "jane", "age": 33, "innerTest": {"link": {"href": "baz"}, "name": "bob"}}
result = rc._add_url_from_link("http://foo", test)
assert result["selfUrl"] == "http://foo/bar"
print(url)
return []
- monkeypatch.setattr(distributor.registry_client, "_get_json",
- fake_get_json_many)
+ monkeypatch.setattr(distributor.registry_client, "_get_json", fake_get_json_many)
assert [3, 2, 1] == rc.get_flow_versions("http://registry/buckets/123/flows/abc/")
def fake_get_flow_versions(url):
return ["1"]
- monkeypatch.setattr(distributor.registry_client, "get_flow_versions",
- fake_get_flow_versions)
+ monkeypatch.setattr(distributor.registry_client, "get_flow_versions", fake_get_flow_versions)
assert None == rc.get_flow_diff_latest("http://registry", "http://registry/buckets/123/flows/abc/")
# ============LICENSE_START=======================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
TEST_DIR = os.path.dirname(__file__)
+
def _load_data(filename):
path = os.path.join(TEST_DIR, filename)
with open(path) as f:
return json.load(f)
+
def _setup():
flow = _load_data("flow.json")
components = _load_data("components.json")
assert list(sorted(expected)) == list(sorted(actual))
# Test processor to processor scenario
- expected = {'metadata': {'data_type': 'json',
- 'dmaap_type': 'MR',
- 'name': 'foo-conn'},
- 'src': {'node': '75c9a179-b36b-4985-9445-d44c8768d6eb',
- 'port': 'ves-pnfRegistration-secondary'},
- 'tgt': {'node': '3fadb641-2079-4ca9-bb07-0df5952967fc',
- 'port': 'predict_subscriber'}}
+ expected = {
+ "metadata": {"data_type": "json", "dmaap_type": "MR", "name": "foo-conn"},
+ "src": {"node": "75c9a179-b36b-4985-9445-d44c8768d6eb", "port": "ves-pnfRegistration-secondary"},
+ "tgt": {"node": "3fadb641-2079-4ca9-bb07-0df5952967fc", "port": "predict_subscriber"},
+ }
actual = [e["payload"] for e in fbp if e["command"] == "addedge"]
assert actual[0] == expected or actual[1] == expected
# Test input port to processor scenario
- expected = {'metadata': {'data_type': 'json', 'dmaap_type': 'MR',
- 'name': 'ves-data-conn'}, 'src': {},
- 'tgt': {'node': '75c9a179-b36b-4985-9445-d44c8768d6eb',
- 'port': 'ves-notification'}}
+ expected = {
+ "metadata": {"data_type": "json", "dmaap_type": "MR", "name": "ves-data-conn"},
+ "src": {},
+ "tgt": {"node": "75c9a179-b36b-4985-9445-d44c8768d6eb", "port": "ves-notification"},
+ }
assert actual[0] == expected or actual[1] == expected
# ============LICENSE_START=======================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
from distributor import utils
+# more tests are in test_api.py
+
+
def test_urljoin():
assert "http://foo/bar/baz" == utils.urljoin("http://foo", "bar", "baz")
assert "http://foo/bar/baz" == utils.urljoin("http://foo/", "bar", "baz")
- assert "http://foo/bar/baz?name=some-name&version=1.5.0" \
- == utils.urljoin("http://foo", "bar", "baz", **{"name": "some-name",
- "version": "1.5.0"})
+ assert "http://foo/bar/baz?name=some-name&version=1.5.0" == utils.urljoin(
+ "http://foo", "bar", "baz", **{"name": "some-name", "version": "1.5.0"}
+ )
+# ============LICENSE_START=======================================================
+# Copyright (c) 2022 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
[tox]
-envlist = py37,py38
+envlist = py38,py39,py310
+skip_missing_interpreters = true
[testenv]
deps=
--- /dev/null
+# ============LICENSE_START=======================================================
+# Copyright (c) 2020-2022 AT&T Intellectual Property. All rights reserved.
+
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+major=1
+minor=1
+patch=1
+base_version=${major}.${minor}.${patch}
+release_version=${base_version}
+snapshot_version=${base_version}-SNAPSHOT