4d3636fa8d892dca4ec9d18965cc09642423cdba
[dcaegen2/platform.git] / adapter / acumos / tests / test_fed.py
1 # ============LICENSE_START====================================================
2 # org.onap.dcae
3 # =============================================================================
4 # Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
5 # =============================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END======================================================
18
19 import json
20 import requests
21
22 from testing_helpers import get_json_fixture as get_test_json
23 from testing_helpers import get_fixture_path as get_test_file
24
25 from aoconversion import docker_gen as aoc_docker_gen
26 from aoconversion import scanner as aoc_scanner
27
28 #
29 # General mocking
30 #
31
32
33 class _MockModule:
34     """
35     Class to mock packages.
36     Just a quick way to create an object with specified attributes
37     """
38     def __init__(self, **kwargs):
39         for k, v in kwargs.items():
40             setattr(self, k, v)
41
42
43 #
44 # Mocking for package "docker"
45 #
46
47
48 class _MockAPIClient:
49     """
50     Class to mock docker.APIClient class.
51     """
52     def __init__(self, base_url, version=None, user_agent='xxx'):
53         pass
54
55     def build(self, path, rm, tag):
56         return [b'some message', b'another message']
57
58     def push(self, repository, auth_config, stream):
59         return [b'some message', b'another message']
60
61     def images(self, x):
62         return True
63
64
65 def _mock_kwargs_from_env(**kwargs):
66     """
67     Method to mock docker.utils.kwargs_from_env method.
68     """
69     return {'base_url': None}
70
71
72 _mockdocker = _MockModule(
73     APIClient=_MockAPIClient,
74     utils=_MockModule(kwargs_from_env=_mock_kwargs_from_env))
75
76
77 #
78 # Mocking for requests.get
79 #
80
81
82 class _r:
83     """
84     Fake responses for mocking requests.get
85     """
86     def __init__(self, json=None, file=None, data=None):
87         self.jx = json
88         self.fx = file
89         self.dx = data
90         self.status_code = 200
91
92     @property
93     def text(self):
94         return self._raw().decode()
95
96     def raise_for_status(self):
97         pass
98
99     def _raw(self):
100         if self.dx is None:
101             if self.fx is not None:
102                 with open(self.fx, 'rb') as f:
103                     self.dx = f.read()
104             elif self.jx is not None:
105                 self.dx = json.dumps(self.jx, sort_keys=True).encode()
106             else:
107                 self.dx = b''
108         return self.dx
109
110     def iter_content(self, bsize=-1):
111         buf = self._raw()
112         pos = 0
113         lim = len(buf)
114         if bsize <= 0:
115             bsize = lim
116         while pos + bsize < lim:
117             yield buf[pos:pos + bsize]
118             pos = pos + bsize
119         yield buf[pos:]
120
121     def json(self):
122         if self.jx is None:
123             self.jx = json.loads(self._raw().decode())
124         return self.jx
125
126
127 def _mockwww(responses):
128     def _op(path, json=None, auth=None, cert=None, verify=None, stream=False):
129         return responses[path]
130     return _op
131
132
133 _mockpostdata = {
134     'https://onboarding/dataformats': _r({'dataFormatUrl': 'https://onboarding/dataformats/somedfid'}),
135     'https://onboarding/components': _r({'componentUrl': 'https://onboarding/components/somedxid'}),
136 }
137
138 _mockpatchdata = {
139     'https://onboarding/dataformats/somedfid': _r({}),
140     'https://onboarding/components/somedxid': _r({}),
141 }
142
143 _mockwebdata = {
144     'https://acumos/catalogs': _r({'content': [{'catalogId': 'c1'}]}),
145     'https://acumos/solutions?catalogId=c1': _r({'content': [{'solutionId': 's1', 'name': 'example-model', 'ratingAverageTenths': 17}]}),
146     'https://acumos/solutions/s1/revisions': _r({'content': [{'revisionId': 'r1'}]}),
147     'https://acumos/solutions/s1/revisions/r1': _r({'content': {
148         'version': 'v1',
149         'modified': '2019-01-01T00:00:00Z',
150         'artifacts': [
151             {'artifactId': 'a1', 'name': 'xxx.other'},
152             {'artifactId': 'a2', 'name': 'xxx.proto'},
153             {'artifactId': 'a3', 'name': 'xxx.zip'},
154             {'artifactId': 'a4', 'name': 'xxx.json'},
155         ]}
156     }),
157     'https://acumos/artifacts/a2/content': _r(file=get_test_file('models/example-model/model.proto')),
158     'https://acumos/artifacts/a3/content': _r(data=b'dummy zip archive data'),
159     'https://acumos/artifacts/a4/content': _r(file=get_test_file('models/example-model/metadata.json')),
160     'http://json-schema.org/draft-04/schema#': _r(get_test_json('jsdraft4schema.json')),
161     'https://gerrit.onap.org/r/gitweb?p=dcaegen2/platform/cli.git;a=blob_plain;f=component-json-schemas/data-format/dcae-cli-v1/data-format-schema.json;hb=HEAD': _r(get_test_json('dataformat_101.json')),
162     'http://dcaeurl//component-json-schemas/data-format/dcae-cli-v1/data-format-schema.json': _r(get_test_json('dataformat_101.json')),
163     'https://gerrit.onap.org/r/gitweb?p=dcaegen2/platform/cli.git;a=blob_plain;f=component-json-schemas/component-specification/dcae-cli-v2/component-spec-schema.json;hb=HEAD': _r(get_test_json('dcae-cli-v2_component-spec-schema.json')),
164     'http://dcaeurl//component-json-schemas/component-specification/dcae-cli-v2/component-spec-schema.json': _r(get_test_json('dcae-cli-v2_component-spec-schema.json')),
165 }
166
167
168 #
169 # End mocking tools
170 #
171
172
173 def test_aoconversion(tmpdir, monkeypatch):
174     config = aoc_scanner.Config(dcaeurl='http://dcaeurl', dcaeuser='dcaeuser', onboardingurl='https://onboarding', onboardinguser='obuser', onboardingpass='obpass', acumosurl='https://acumos', certfile=None, dockerregistry='dockerregistry', dockeruser='registryuser', dockerpass='registrypassword')
175     monkeypatch.setattr(aoc_docker_gen, 'APIClient', _mockdocker.APIClient)
176     monkeypatch.setattr(requests, 'get', _mockwww(_mockwebdata))
177     monkeypatch.setattr(requests, 'post', _mockwww(_mockpostdata))
178     monkeypatch.setattr(requests, 'patch', _mockwww(_mockpatchdata))
179     aoc_scanner.scan(config)
180     aoc_scanner.scan(config)