[DCAE] INFO.yaml update
[dcaegen2/platform.git] / adapter / acumos / aoconversion / scanner.py
1 # ============LICENSE_START====================================================
2 # org.onap.dcae
3 # =============================================================================
4 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2021 highstreet technologies GmbH. All rights reserved.
6 # =============================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END======================================================
19
20 import json
21 import os
22 from pkg_resources import resource_string
23 import shutil
24 import traceback
25 try:
26     from urllib.parse import unquote_plus
27     from socketserver import ThreadingMixIn
28     from http.server import BaseHTTPRequestHandler, HTTPServer
29 except ImportError:
30     from urllib import unquote_plus
31     from SocketServer import ThreadingMixIn
32     from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
33
34 import requests
35
36 from aoconversion import convert
37
38
39 def _derefconfig(value):
40     if value.startswith('@'):
41         with open(value[1:], 'r') as f:
42             return f.readline().strip()
43     return value
44
45
46 class Config(object):
47     """
48     Configuration parameters as attributes, make sure the required ones are there,
49     populate defaults.
50     """
51     def __init__(self, dcaeuser, onboardingurl, onboardinguser, onboardingpass, certfile, dockerregistry, dockeruser, dockerpass, http_proxy, https_proxy, no_proxy, acumosurl=None, interval=900, dockerhost='tcp://localhost:2375', tmpdir='/var/tmp/aoadapter', certverify=True, catalogs=None, port=None, **extras):
52         self.dcaeuser = dcaeuser
53
54         def x(fmt, *args, **kwargs):
55             return onboardingurl + fmt.format(*args, **kwargs)
56         self.oburl = x
57         self._onboardingpass = onboardingpass
58         self._onboardinguser = onboardinguser
59         self.acumosurl = acumosurl
60         self.certfile = certfile
61         self.certverify = certverify
62         self.dockerhost = dockerhost
63         self.dockerregistry = dockerregistry
64         self.dockeruser = dockeruser
65         self._dockerpass = dockerpass
66         self.interval = interval
67         self.tmpdir = tmpdir
68         self.http_proxy = http_proxy if http_proxy is not None else ""
69         self.https_proxy = https_proxy if http_proxy is not None else ""
70         self.no_proxy = no_proxy if no_proxy is not None else ""
71         if catalogs is not None and type(catalogs) is not list:
72             catalogs = [catalogs]
73         self.catalogs = catalogs
74         self.port = port
75
76     def obauth(self):
77         return (self._onboardinguser, _derefconfig(self._onboardingpass))
78
79     def dockerpass(self):
80         return _derefconfig(self._dockerpass)
81
82
83 class _AcumosAccess(object):
84     def __init__(self, config, url):
85         self.cert = config.certfile
86         self.verify = config.certverify
87         self.url = url.strip().rstrip('/')
88
89     def artgetter(self, xrev, matcher):
90         for art in xrev['artifacts']:
91             if matcher(art):
92                 def ret():
93                     nurl = '{}/artifacts/{}/content'.format(self.url, art['artifactId'])
94                     resp = requests.get(nurl, stream=True, cert=self.cert, verify=self.verify)
95                     if resp.status_code == 500:
96                         resp = requests.get(nurl, stream=True, cert=self.cert, verify=self.verify)
97                     resp.raise_for_status()
98                     return resp
99                 return ret
100         return None
101
102     def jsonget(self, format, *args, **kwargs):
103         nurl = self.url + format.format(*args, **kwargs)
104         resp = requests.get(nurl, cert=self.cert, verify=self.verify)
105         if resp.status_code == 500:
106             resp = requests.get(nurl, cert=self.cert, verify=self.verify)
107         resp.raise_for_status()
108         return resp.json()['content']
109
110
111 def _x_proto_matcher(art):
112     """ Is this artifact the x.proto file? """
113     return art['name'].endswith('.proto')
114
115
116 def _x_zip_matcher(art):
117     """ Is this artifact the x.zip file? """
118     return art['name'].endswith('.zip')
119
120
121 def _md_json_matcher(art):
122     """ Is this artifact the metadata.json file? """
123     return art['name'].__contains__('metadata') & art['name'].endswith('.json')
124
125
126 def _walk(config):
127     """
128     Walk the Federation E5 interface of an Acumos instance
129     """
130     url = config.acumosurl
131     callback = _makecallback(config)
132     catalogs = config.catalogs
133     aa = _AcumosAccess(config, url)
134     for catalog in aa.jsonget('/catalogs'):
135         if catalogs is not None and catalog['catalogId'] not in catalogs and catalog['name'] not in catalogs:
136             continue
137         for solution in aa.jsonget('/solutions?catalogId={}', catalog['catalogId']):
138             for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
139                 onboard(aa, callback, solution, revision['revisionId'])
140
141
142 def onboard(aa, callback, solution, revid):
143
144     xrev = aa.jsonget('/solutions/{}/revisions/{}', solution['solutionId'], revid)
145     callback(model_name=solution['name'], model_version=xrev['version'], model_last_updated=xrev['modified'], rating=solution['ratingAverageTenths'] / 10.0, proto_getter=aa.artgetter(xrev, _x_proto_matcher), zip_getter=aa.artgetter(xrev, _x_zip_matcher), metadata_getter=aa.artgetter(xrev, _md_json_matcher))
146
147
148 def _pullfile(source, dest):
149     with open(dest, 'wb') as f:
150         for chunk in source().iter_content(65536):
151             f.write(chunk)
152
153
154 _loadedformats = set()
155 _loadedcomponents = set()
156
157
158 def scan(config):
159     _walk(config)
160
161
162 def _makecallback(config):
163     workdir = config.tmpdir
164     obauth = config.obauth()
165     oburl = config.oburl
166
167     def callback(model_name, model_version, model_last_updated, rating, proto_getter, zip_getter, metadata_getter):
168         model_name = model_name.lower()
169         model_version = model_version.lower()
170         compid = (model_name, model_version)
171         if compid in _loadedcomponents:
172             print('Skipping component {}: already analyzed'.format(compid))
173             return
174         if proto_getter is None or zip_getter is None or metadata_getter is None:
175             print('Skipping component {}: does not have required artifacts'.format(compid))
176             _loadedcomponents.add(compid)
177             return
178         modeldir = '{}/{}'.format(workdir, model_name)
179         shutil.rmtree(modeldir, True)
180         os.makedirs(modeldir)
181         try:
182             _pullfile(proto_getter, '{}/model.proto'.format(modeldir))
183             _pullfile(zip_getter, '{}/model.zip'.format(modeldir))
184             _pullfile(metadata_getter, '{}/metadata.json'.format(modeldir))
185         except Exception:
186             print('Skipping component {}: artifact access error'.format(compid))
187             _loadedcomponents.add(compid)
188             return
189         try:
190             docker_uri, data_formats, spec = convert.gen_dcae_artifacts_for_model(config, model_name, model_version)
191             shutil.rmtree(modeldir)
192         except Exception:
193             print('Error analyzing artifacts for {}'.format(compid))
194             traceback.print_exc()
195             return
196         for data_format in data_formats:
197             fmtid = (data_format['self']['name'], data_format['self']['version'])
198             if fmtid in _loadedformats:
199                 print('Skipping data format {}: already analyzed'.format(fmtid))
200                 continue
201             try:
202                 resp = requests.post(oburl('/dataformats'), json={'owner': config.dcaeuser, 'spec': data_format}, auth=obauth)
203                 if resp.status_code == 409:
204                     print('Skipping data format {}: previously loaded'.format(fmtid))
205                     _loadedformats.add(fmtid)
206                     continue
207                 resp.raise_for_status()
208                 requests.patch(resp.json()['dataFormatUrl'], json={'owner': config.dcaeuser, 'status': 'published'}, auth=obauth).raise_for_status()
209                 print('Loaded data format {}'.format(fmtid))
210                 _loadedformats.add(fmtid)
211             except Exception:
212                 print('Error loading data format {}'.format(fmtid))
213                 traceback.print_exc()
214                 raise
215         try:
216             resp = requests.post(oburl('/components'), json={'owner': config.dcaeuser, 'spec': spec}, auth=obauth)
217             if resp.status_code == 409:
218                 print('Skipping component {}: previously loaded'.format(compid))
219                 _loadedcomponents.add(compid)
220                 return
221             resp.raise_for_status()
222             requests.patch(resp.json()['componentUrl'], json={'owner': config.dcaeuser, 'status': 'published'}, auth=obauth).raise_for_status()
223             print('Loaded component {}'.format(compid))
224             _loadedcomponents.add(compid)
225         except Exception:
226             print('Error loading component {}'.format(compid))
227             traceback.print_exc()
228             raise
229     return callback
230
231
232 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
233     protocol_version = "HTTP/1.1"
234
235
236 class Apihandler(BaseHTTPRequestHandler):
237     def doqp(self):
238         self.qparams = {}
239         if not self.path or '?' not in self.path:
240             return
241         self.path, qp = self.path.split('?', 1)
242         for x in qp.split('&'):
243             k, v = x.split('=', 1)
244             self.qparams[unquote_plus(k)] = unquote_plus(v)
245
246     def replyjson(self, body, ctype='application/json'):
247         self.replyraw(json.dumps(body).encode('utf-8'), ctype)
248
249     def replyraw(self, data, ctype):
250         self.send_response(200)
251         self.send_header('Content-Type', ctype)
252         self.send_header('Content-Length', len(data))
253         self.end_headers()
254         self.wfile.write(data)
255
256     def do_GET(self):
257         self.doqp()
258         if self.path == '/' or self.path == '/index.html' or self.path == '/acumos-adapter/' or self.path == '/acumos-adapter/index.html':
259             self.replyraw(self.server.index, 'text/html')
260             return
261         if 'acumos' not in self.qparams:
262             self.send_error(400)
263             return
264         aa = _AcumosAccess(self.server.config, self.qparams['acumos'])
265         if self.path == '/acumos-adapter/listCatalogs.js':
266             self.replyjson([{'name': x['name'], 'id': x['catalogId']} for x in aa.jsonget('/catalogs')])
267             return
268         if self.path == '/acumos-adapter/listSolutions.js':
269             if 'catalogId' not in self.qparams:
270                 self.send_error(400)
271                 return
272             self.replyjson([{'name': x['name'], 'id': x['solutionId']} for x in aa.jsonget('/solutions?catalogId={}', self.qparams['catalogId'])])
273             return
274         if self.path == '/acumos-adapter/listRevisions.js':
275             if 'solutionId' not in self.qparams:
276                 self.send_error(400)
277                 return
278             self.replyjson([{'name': x['version'], 'id': x['revisionId']} for x in aa.jsonget('/solutions/{}/revisions', self.qparams['solutionId'])])
279             return
280         self.send_error(404)
281
282     def do_POST(self):
283         self.doqp()
284         if self.path == '/acumos-adapter/onboard.js':
285             if 'acumos' not in self.qparams:
286                 self.send_error(400)
287                 return
288             aa = _AcumosAccess(self.server.config, self.qparams['acumos'])
289             callback = self.server.callback
290             if 'catalogId' not in self.qparams:
291                 for catalog in aa.jsonget('/catalogs'):
292                     for solution in aa.jsonget('/solutions?catalogId={}', catalog['catalogId']):
293                         for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
294                             onboard(aa, callback, solution, revision['revisionId'])
295             elif 'solutionId' not in self.qparams:
296                 for solution in aa.jsonget('/solutions?catalogId={}', self.qparams['catalogId']):
297                     for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
298                         onboard(aa, callback, solution, revision['revisionId'])
299             elif 'revisionId' not in self.qparams:
300                 solution = aa.jsonget('/solutions/{}', self.qparams['solutionId'])
301                 for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
302                     onboard(aa, callback, solution, revision['revisionId'])
303             else:
304                 solution = aa.jsonget('/solutions/{}', self.qparams['solutionId'])
305                 onboard(aa, callback, solution, self.qparams['revisionId'])
306             self.replyraw('OK'.encode('utf-8'), 'text/plain')
307             return
308         self.send_error(400)
309
310
311 def serve(config):
312     server = ThreadedHTTPServer(('', config.port), Apihandler)
313     server.config = config
314     server.callback = _makecallback(config)
315     server.index = resource_string(__name__, 'index.html')
316     server.serve_forever()