18dd70186d48a896019232d1767d65735bc45bf2
[dcaegen2/platform.git] / adapter / acumos / aoconversion / scanner.py
1 # ============LICENSE_START====================================================
2 # org.onap.dcae
3 # =============================================================================
4 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5 # =============================================================================
6 # Copyright (c) 2021 highstreet technologies GmbH. All rights reserved.
7 # =============================================================================
8 # Licensed under the Apache License, Version 2.0 (the "License");
9 # you may not use this file except in compliance with the License.
10 # You may obtain a copy of the License at
11 #
12 #      http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS,
16 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 # See the License for the specific language governing permissions and
18 # limitations under the License.
19 # ============LICENSE_END======================================================
20
21 import json
22 import os
23 from pkg_resources import resource_string
24 import shutil
25 import traceback
26 try:
27     from urllib.parse import unquote_plus
28     from socketserver import ThreadingMixIn
29     from http.server import BaseHTTPRequestHandler, HTTPServer
30 except ImportError:
31     from urllib import unquote_plus
32     from SocketServer import ThreadingMixIn
33     from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
34
35 import requests
36
37 from aoconversion import convert
38
39
40 def _derefconfig(value):
41     if value.startswith('@'):
42         with open(value[1:], 'r') as f:
43             return f.readline().strip()
44     return value
45
46
47 class Config(object):
48     """
49     Configuration parameters as attributes, make sure the required ones are there,
50     populate defaults.
51     """
52     def __init__(self, dcaeuser, onboardingurl, onboardinguser, onboardingpass, certfile, dockerregistry, dockeruser, dockerpass, http_proxy, https_proxy, no_proxy, acumosurl=None, interval=900, dockerhost='tcp://localhost:2375', tmpdir='/var/tmp/aoadapter', certverify=True, catalogs=None, port=None, **extras):
53         self.dcaeuser = dcaeuser
54
55         def x(fmt, *args, **kwargs):
56             return onboardingurl + fmt.format(*args, **kwargs)
57         self.oburl = x
58         self._onboardingpass = onboardingpass
59         self._onboardinguser = onboardinguser
60         self.acumosurl = acumosurl
61         self.certfile = certfile
62         self.certverify = certverify
63         self.dockerhost = dockerhost
64         self.dockerregistry = dockerregistry
65         self.dockeruser = dockeruser
66         self._dockerpass = dockerpass
67         self.interval = interval
68         self.tmpdir = tmpdir
69         self.http_proxy = http_proxy if http_proxy is not None else ""
70         self.https_proxy = https_proxy if http_proxy is not None else ""
71         self.no_proxy = no_proxy if no_proxy is not None else ""
72         if catalogs is not None and type(catalogs) is not list:
73             catalogs = [catalogs]
74         self.catalogs = catalogs
75         self.port = port
76
77     def obauth(self):
78         return (self._onboardinguser, _derefconfig(self._onboardingpass))
79
80     def dockerpass(self):
81         return _derefconfig(self._dockerpass)
82
83
84 class _AcumosAccess(object):
85     def __init__(self, config, url):
86         self.cert = config.certfile
87         self.verify = config.certverify
88         self.url = url.strip().rstrip('/')
89
90     def artgetter(self, xrev, matcher):
91         for art in xrev['artifacts']:
92             if matcher(art):
93                 def ret():
94                     nurl = '{}/artifacts/{}/content'.format(self.url, art['artifactId'])
95                     resp = requests.get(nurl, stream=True, cert=self.cert, verify=self.verify)
96                     if resp.status_code == 500:
97                         resp = requests.get(nurl, stream=True, cert=self.cert, verify=self.verify)
98                     resp.raise_for_status()
99                     return resp
100                 return ret
101         return None
102
103     def jsonget(self, format, *args, **kwargs):
104         nurl = self.url + format.format(*args, **kwargs)
105         resp = requests.get(nurl, cert=self.cert, verify=self.verify)
106         if resp.status_code == 500:
107             resp = requests.get(nurl, cert=self.cert, verify=self.verify)
108         resp.raise_for_status()
109         return resp.json()['content']
110
111
112 def _x_proto_matcher(art):
113     """ Is this artifact the x.proto file? """
114     return art['name'].endswith('.proto')
115
116
117 def _x_zip_matcher(art):
118     """ Is this artifact the x.zip file? """
119     return art['name'].endswith('.zip')
120
121
122 def _md_json_matcher(art):
123     """ Is this artifact the metadata.json file? """
124     return art['name'].__contains__('metadata') & art['name'].endswith('.json')
125
126
127 def _walk(config):
128     """
129     Walk the Federation E5 interface of an Acumos instance
130     """
131     url = config.acumosurl
132     callback = _makecallback(config)
133     catalogs = config.catalogs
134     aa = _AcumosAccess(config, url)
135     for catalog in aa.jsonget('/catalogs'):
136         if catalogs is not None and catalog['catalogId'] not in catalogs and catalog['name'] not in catalogs:
137             continue
138         for solution in aa.jsonget('/solutions?catalogId={}', catalog['catalogId']):
139             for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
140                 onboard(aa, callback, solution, revision['revisionId'])
141
142
143 def onboard(aa, callback, solution, revid):
144
145     xrev = aa.jsonget('/solutions/{}/revisions/{}', solution['solutionId'], revid)
146     callback(model_name=solution['name'], model_version=xrev['version'], model_last_updated=xrev['modified'], rating=solution['ratingAverageTenths'] / 10.0, proto_getter=aa.artgetter(xrev, _x_proto_matcher), zip_getter=aa.artgetter(xrev, _x_zip_matcher), metadata_getter=aa.artgetter(xrev, _md_json_matcher))
147
148
149 def _pullfile(source, dest):
150     with open(dest, 'wb') as f:
151         for chunk in source().iter_content(65536):
152             f.write(chunk)
153
154
155 _loadedformats = set()
156 _loadedcomponents = set()
157
158
159 def scan(config):
160     _walk(config)
161
162
163 def _makecallback(config):
164     workdir = config.tmpdir
165     obauth = config.obauth()
166     oburl = config.oburl
167
168     def callback(model_name, model_version, model_last_updated, rating, proto_getter, zip_getter, metadata_getter):
169         model_name = model_name.lower()
170         model_version = model_version.lower()
171         compid = (model_name, model_version)
172         if compid in _loadedcomponents:
173             print('Skipping component {}: already analyzed'.format(compid))
174             return
175         if proto_getter is None or zip_getter is None or metadata_getter is None:
176             print('Skipping component {}: does not have required artifacts'.format(compid))
177             _loadedcomponents.add(compid)
178             return
179         modeldir = '{}/{}'.format(workdir, model_name)
180         shutil.rmtree(modeldir, True)
181         os.makedirs(modeldir)
182         try:
183             _pullfile(proto_getter, '{}/model.proto'.format(modeldir))
184             _pullfile(zip_getter, '{}/model.zip'.format(modeldir))
185             _pullfile(metadata_getter, '{}/metadata.json'.format(modeldir))
186         except Exception:
187             print('Skipping component {}: artifact access error'.format(compid))
188             _loadedcomponents.add(compid)
189             return
190         try:
191             docker_uri, data_formats, spec = convert.gen_dcae_artifacts_for_model(config, model_name, model_version)
192             shutil.rmtree(modeldir)
193         except Exception:
194             print('Error analyzing artifacts for {}'.format(compid))
195             traceback.print_exc()
196             return
197         for data_format in data_formats:
198             fmtid = (data_format['self']['name'], data_format['self']['version'])
199             if fmtid in _loadedformats:
200                 print('Skipping data format {}: already analyzed'.format(fmtid))
201                 continue
202             try:
203                 resp = requests.post(oburl('/dataformats'), json={'owner': config.dcaeuser, 'spec': data_format}, auth=obauth)
204                 if resp.status_code == 409:
205                     print('Skipping data format {}: previously loaded'.format(fmtid))
206                     _loadedformats.add(fmtid)
207                     continue
208                 resp.raise_for_status()
209                 requests.patch(resp.json()['dataFormatUrl'], json={'owner': config.dcaeuser, 'status': 'published'}, auth=obauth).raise_for_status()
210                 print('Loaded data format {}'.format(fmtid))
211                 _loadedformats.add(fmtid)
212             except Exception:
213                 print('Error loading data format {}'.format(fmtid))
214                 traceback.print_exc()
215                 raise
216         try:
217             resp = requests.post(oburl('/components'), json={'owner': config.dcaeuser, 'spec': spec}, auth=obauth)
218             if resp.status_code == 409:
219                 print('Skipping component {}: previously loaded'.format(compid))
220                 _loadedcomponents.add(compid)
221                 return
222             resp.raise_for_status()
223             requests.patch(resp.json()['componentUrl'], json={'owner': config.dcaeuser, 'status': 'published'}, auth=obauth).raise_for_status()
224             print('Loaded component {}'.format(compid))
225             _loadedcomponents.add(compid)
226         except Exception:
227             print('Error loading component {}'.format(compid))
228             traceback.print_exc()
229             raise
230     return callback
231
232
233 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
234     protocol_version = "HTTP/1.1"
235
236
237 class Apihandler(BaseHTTPRequestHandler):
238     def doqp(self):
239         self.qparams = {}
240         if not self.path or '?' not in self.path:
241             return
242         self.path, qp = self.path.split('?', 1)
243         for x in qp.split('&'):
244             k, v = x.split('=', 1)
245             self.qparams[unquote_plus(k)] = unquote_plus(v)
246
247     def replyjson(self, body, ctype='application/json'):
248         self.replyraw(json.dumps(body).encode('utf-8'), ctype)
249
250     def replyraw(self, data, ctype):
251         self.send_response(200)
252         self.send_header('Content-Type', ctype)
253         self.send_header('Content-Length', len(data))
254         self.end_headers()
255         self.wfile.write(data)
256
257     def do_GET(self):
258         self.doqp()
259         if self.path == '/' or self.path == '/index.html' or self.path == '/acumos-adapter/' or self.path == '/acumos-adapter/index.html':
260             self.replyraw(self.server.index, 'text/html')
261             return
262         if 'acumos' not in self.qparams:
263             self.send_error(400)
264             return
265         aa = _AcumosAccess(self.server.config, self.qparams['acumos'])
266         if self.path == '/acumos-adapter/listCatalogs.js':
267             self.replyjson([{'name': x['name'], 'id': x['catalogId']} for x in aa.jsonget('/catalogs')])
268             return
269         if self.path == '/acumos-adapter/listSolutions.js':
270             if 'catalogId' not in self.qparams:
271                 self.send_error(400)
272                 return
273             self.replyjson([{'name': x['name'], 'id': x['solutionId']} for x in aa.jsonget('/solutions?catalogId={}', self.qparams['catalogId'])])
274             return
275         if self.path == '/acumos-adapter/listRevisions.js':
276             if 'solutionId' not in self.qparams:
277                 self.send_error(400)
278                 return
279             self.replyjson([{'name': x['version'], 'id': x['revisionId']} for x in aa.jsonget('/solutions/{}/revisions', self.qparams['solutionId'])])
280             return
281         self.send_error(404)
282
283     def do_POST(self):
284         self.doqp()
285         if self.path == '/acumos-adapter/onboard.js':
286             if 'acumos' not in self.qparams:
287                 self.send_error(400)
288                 return
289             aa = _AcumosAccess(self.server.config, self.qparams['acumos'])
290             callback = self.server.callback
291             if 'catalogId' not in self.qparams:
292                 for catalog in aa.jsonget('/catalogs'):
293                     for solution in aa.jsonget('/solutions?catalogId={}', catalog['catalogId']):
294                         for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
295                             onboard(aa, callback, solution, revision['revisionId'])
296             elif 'solutionId' not in self.qparams:
297                 for solution in aa.jsonget('/solutions?catalogId={}', self.qparams['catalogId']):
298                     for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
299                         onboard(aa, callback, solution, revision['revisionId'])
300             elif 'revisionId' not in self.qparams:
301                 solution = aa.jsonget('/solutions/{}', self.qparams['solutionId'])
302                 for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
303                     onboard(aa, callback, solution, revision['revisionId'])
304             else:
305                 solution = aa.jsonget('/solutions/{}', self.qparams['solutionId'])
306                 onboard(aa, callback, solution, self.qparams['revisionId'])
307             self.replyraw('OK'.encode('utf-8'), 'text/plain')
308             return
309         self.send_error(400)
310
311
312 def serve(config):
313     server = ThreadedHTTPServer(('', config.port), Apihandler)
314     server.config = config
315     server.callback = _makecallback(config)
316     server.index = resource_string(__name__, 'index.html')
317     server.serve_forever()