1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2020-2022 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
18 Tests that require a mock requests module, plus a few more
19 that didn't fit cleanly elsewhere.
27 from distributor.http import _app as app
28 from distributor import config
29 from distributor import onboarding_client
30 from distributor import utils
31 from distributor import errors
32 from distributor import data_access
33 from distributor import transform
37 def __init__(self, code, json=None):
38 self.status_code = code
45 def raise_for_status(self):
46 if self.status_code < 200 or self.status_code >= 300:
47 raise Exception("Error response {}".format(self.status_code))
51 # in the test code, you can set
52 # _req.SHOWMATCHES = True
53 # and the match results will be displayed
56 def __init__(self, op, url, resp):
61 def check(self, op, url):
63 print(f"_req.check(op={op} vs {self.op}, url={url} vs {self.url})")
64 return self.resp if op == self.op and url == self.url else None
67 def _match(answers, op, url):
68 for choice in answers:
69 ret = choice.check(op, url)
72 message = "Unexpected request {} {}".format(op, url)
74 raise Exception(message)
78 def mockrequests(monkeypatch):
81 def get(url, headers=None):
82 return _match(answers, "GET", url)
84 def post(url, json, headers=None):
85 return _match(answers, "POST", url)
87 def put(url, json, headers=None):
88 return _match(answers, "PUT", url)
90 def delete(url, headers=None):
91 return _match(answers, "DELETE", url)
93 monkeypatch.setattr(requests, "get", get)
94 monkeypatch.setattr(requests, "post", post)
95 monkeypatch.setattr(requests, "put", put)
96 monkeypatch.setattr(requests, "delete", delete)
102 app.config["TESTING"] = True
103 with app.test_client() as client:
108 """verify that a string looks like an iso8901 date/time string YYYY-MM-DDTHH:MM:SS.MS"""
109 return re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}[.]\d+$", dt)
113 """verify that a string looks like a guid"""
114 return re.match(r"[a-zA-Z0-9]{8}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{12}$", gu)
120 def test_api(client, mockrequests):
121 env_name = "TEST_API_GRAB_ENVIRON"
122 os.environ[env_name] = "xyz"
123 assert config._grab_env(env_name, "foo") == "xyz"
124 assert config._grab_env(env_name) == "xyz"
125 del os.environ[env_name]
126 assert config._grab_env(env_name, "foo") == "foo"
128 config._grab_env(env_name)
129 assert not "config._grab_env(env_name) should throw errors.DistributorAPIConfigError"
130 except errors.DistributorAPIConfigError as e:
134 dummyflow = {"link": {"href": "buckets/link1/flows/flow1"}, "name": "flowname"}
136 nifi_url = "http://nifi-registry:18080/nifi-registry-api"
139 _req("GET", nifi_url + "/buckets", _resp(200, [{"link": {"href": "buckets/link1"}}])),
140 _req("GET", nifi_url + "/buckets/link1/flows", _resp(200, [dummyflow])),
141 _req("POST", "http://newtarget1/url/api/graph/main", _resp(200, {"id": "group1"})),
142 _req("GET", "/does/not/exist", _resp(404, [{"link": {"href": "does/not/exist"}}])),
145 "/distributor/distribution-targets/components?name=foo&version=bar",
146 _resp(200, {"id": "groupd", "components": [{"componentUrl": "COMPONENTURL"}]}),
148 _req("GET", "COMPONENTURL", _resp(200, {"id": "groupComponentUrl"})),
151 "/distributor/distribution-targets/components?name=foo&version=bar2",
152 _resp(200, {"id": "groupd", "components": None}),
156 for rule in app.url_map.iter_rules():
158 url = "/distributor/distribution-targets"
159 url2 = url + "/notfound"
160 url3 = url2 + "/process-groups"
161 assert len(client.get(url).get_json()["distributionTargets"]) == 0
162 assert client.get(url2).status_code == 404
163 assert client.put(url2, json={"name": "notfound1", "runtimeApiUrl": "http://notfound/url"}).status_code == 404
164 assert client.delete(url2).status_code == 404
165 assert client.post(url3, json={"processGroupId": "group1"}).status_code == 404
166 resp = client.post(url, json={"name": "target1", "runtimeApiUrl": "http://target/url"})
167 assert resp.status_code == 200
169 # print(resp.get_json())
170 url2 = "/distributor/distribution-targets/" + resp.get_json()["id"]
171 url3 = url2 + "/process-groups"
172 assert len(client.get(url).get_json()["distributionTargets"]) == 1
174 assert client.get(url2).status_code == 200
175 assert client.put(url2, json={"name": "newtarget1", "runtimeApiUrl": "http://newtarget1/url"}).status_code == 200
176 assert client.put(url2, json={"name": "newtarget1", "runtimeApiUrl": "http://newtarget1/url"}).status_code == 200
178 assert client.post(url3, json={"processGroupId": "group1"}).status_code == 404
179 assert client.post(url3, json={"processGroupId": "group1"}).status_code == 404
180 dummyflow["identifier"] = "group1"
181 assert client.post(url3, json={"processGroupId": "group1"}).status_code == 501
183 assert client.delete(url2).status_code == 200
184 assert client.delete(url2).status_code == 404
185 url4 = "/does/not/exist"
187 # the following tests do not require an http client but do use requests lib
189 # test get_json() exception case
192 assert not "utils.get_json(url4) should throw errors.DistributorAPIError"
193 except errors.DistributorAPIError as e:
197 # _req.SHOWMATCHES = True
198 ret = onboarding_client.get_components_indexed(url, [("foo", "bar")])
199 assert ret == {("foo", "bar"): {"id": "groupComponentUrl"}}
203 ret = onboarding_client.get_components_indexed(url, [("foo", "bar2")])
205 not "onboarding_client.get_components_indexed(...foo,bar2) should throw errors.DistributorAPIResourceNotFound"
207 except errors.DistributorAPIResourceNotFound as e:
212 def test_data_access():
213 # various tests for data_access.py
215 saved_cache = copy.deepcopy(data_access.get_distribution_targets())
216 ret = data_access.get_distribution_target("ds")
219 # new transform_request()
220 req1 = {"name": "req1", "runtimeApiUrl": "rtau1", "nextDistributionTargetId": "ndti1"}
221 treq1 = data_access.transform_request(req1)
222 assert isdate(treq1["created"])
223 assert isdate(treq1["modified"])
224 assert isuuid(treq1["dt_id"])
225 assert treq1["processGroups"] == []
227 # new transform_request()
228 req2 = {"name": "req2", "runtimeApiUrl": "rtau2", "nextDistributionTargetId": "ndti1"}
229 treq2 = data_access.transform_request(req2)
230 assert isdate(treq2["created"])
231 assert isdate(treq2["modified"])
232 assert isuuid(treq2["dt_id"])
233 assert treq2["processGroups"] == []
235 # merge_request() should copy certain values from 2nd arg into 1st arg
236 ret = data_access.merge_request(treq1, treq2)
237 assert ret["name"] == treq2["name"]
238 assert ret["runtimeApiUrl"] == treq2["runtimeApiUrl"]
239 assert ret["description"] is None
240 assert ret["nextDistributionTargetId"] == treq2["nextDistributionTargetId"]
242 # add_distribution_target() adds to the cache
243 ret = data_access.add_distribution_target({"dt_id": "dt1", "val": "1", "processGroups": []})
244 assert data_access.get_distribution_target("dt1")["val"] == "1"
246 # update_distribution_target() updates an existing element of the cache
247 # If the element exists, it returns True
248 ret = data_access.update_distribution_target({"dt_id": "dt1", "val": "1b", "processGroups": []})
250 assert data_access.get_distribution_target("dt1")["val"] == "1b"
252 # update_distribution_target() updates an existing element of the cache
253 # If the element does not exist, it returns False
254 ret = data_access.update_distribution_target({"dt_id": "dt2", "val": "2", "processGroups": []})
257 # add_process_group adds an element to the processGroups array of the distribution target
258 # if the element exists, returns true, else false
259 assert data_access.add_process_group("dt1", {"processed": "p1"})
260 assert isdate(data_access.get_distribution_target("dt1")["processGroups"][0]["processed"])
261 assert not data_access.add_process_group("dt2", {"processed": "p1"})
264 # if the element exists,
265 assert data_access.delete_distribution_target("dt1")
266 assert not data_access.delete_distribution_target("dt2")
268 assert data_access.get_distribution_targets() == saved_cache
271 def test_transform():
272 # various tests for transform.py
273 flow1 = {"flowContents": {"processors": []}}
278 "bundle": {"artifact": "artifact1", "version": "version1"},
287 "bundle": {"artifact": "artifact1", "version": "version1"},
289 {"bundle": {"artifact": "artifact2", "version": "version2"}},
293 assert transform.extract_components_from_flow(flow1) == []
294 assert transform.extract_components_from_flow(flow2) == [("artifact1", "version1")]
295 assert transform.extract_components_from_flow(flow3) == [("artifact1", "version1"), ("artifact2", "version2")]