Merge "Modify metadata matcher"
[dcaegen2/platform.git] / adapter / acumos / aoconversion / scanner.py
1 # ============LICENSE_START====================================================
2 # org.onap.dcae
3 # =============================================================================
4 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5 # =============================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END======================================================
18
19 import json
20 import os
21 from pkg_resources import resource_string
22 import shutil
23 import traceback
24 try:
25     from urllib.parse import unquote_plus
26     from socketserver import ThreadingMixIn
27     from http.server import BaseHTTPRequestHandler, HTTPServer
28 except ImportError:
29     from urllib import unquote_plus
30     from SocketServer import ThreadingMixIn
31     from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
32
33 import requests
34
35 from aoconversion import convert
36
37
38 def _derefconfig(value):
39     if value.startswith('@'):
40         with open(value[1:], 'r') as f:
41             return f.readline().strip()
42     return value
43
44
45 class Config(object):
46     """
47     Configuration parameters as attributes, make sure the required ones are there,
48     populate defaults.
49     """
50     def __init__(self, dcaeuser, onboardingurl, onboardinguser, onboardingpass, certfile, dockerregistry, dockeruser, dockerpass, acumosurl=None, interval=900, dockerhost='tcp://localhost:2375', tmpdir='/var/tmp/aoadapter', certverify=True, catalogs=None, port=None, **extras):
51         self.dcaeuser = dcaeuser
52
53         def x(fmt, *args, **kwargs):
54             return onboardingurl + fmt.format(*args, **kwargs)
55         self.oburl = x
56         self._onboardingpass = onboardingpass
57         self._onboardinguser = onboardinguser
58         self.acumosurl = acumosurl
59         self.certfile = certfile
60         self.certverify = certverify
61         self.dockerhost = dockerhost
62         self.dockerregistry = dockerregistry
63         self.dockeruser = dockeruser
64         self._dockerpass = dockerpass
65         self.interval = interval
66         self.tmpdir = tmpdir
67         if catalogs is not None and type(catalogs) is not list:
68             catalogs = [catalogs]
69         self.catalogs = catalogs
70         self.port = port
71
72     def obauth(self):
73         return (self._onboardinguser, _derefconfig(self._onboardingpass))
74
75     def dockerpass(self):
76         return _derefconfig(self._dockerpass)
77
78
79 class _AcumosAccess(object):
80     def __init__(self, config, url):
81         self.cert = config.certfile
82         self.verify = config.certverify
83         self.url = url.strip().rstrip('/')
84
85     def artgetter(self, xrev, matcher):
86         for art in xrev['artifacts']:
87             if matcher(art):
88                 def ret():
89                     nurl = '{}/artifacts/{}/content'.format(self.url, art['artifactId'])
90                     resp = requests.get(nurl, stream=True, cert=self.cert, verify=self.verify)
91                     if resp.status_code == 500:
92                         resp = requests.get(nurl, stream=True, cert=self.cert, verify=self.verify)
93                     resp.raise_for_status()
94                     return resp
95                 return ret
96         return None
97
98     def jsonget(self, format, *args, **kwargs):
99         nurl = self.url + format.format(*args, **kwargs)
100         resp = requests.get(nurl, cert=self.cert, verify=self.verify)
101         if resp.status_code == 500:
102             resp = requests.get(nurl, cert=self.cert, verify=self.verify)
103         resp.raise_for_status()
104         return resp.json()['content']
105
106
107 def _x_proto_matcher(art):
108     """ Is this artifact the x.proto file? """
109     return art['name'].endswith('.proto')
110
111
112 def _x_zip_matcher(art):
113     """ Is this artifact the x.zip file? """
114     return art['name'].endswith('.zip')
115
116
117 def _md_json_matcher(art):
118     """ Is this artifact the metadata.json file? """
119     return art['name'].__contains__('metadata') & art['name'].endswith('.json')
120
121
122 def _walk(config):
123     """
124     Walk the Federation E5 interface of an Acumos instance
125     """
126     url = config.acumosurl
127     callback = _makecallback(config)
128     catalogs = config.catalogs
129     aa = _AcumosAccess(config, url)
130     for catalog in aa.jsonget('/catalogs'):
131         if catalogs is not None and catalog['catalogId'] not in catalogs and catalog['name'] not in catalogs:
132             continue
133         for solution in aa.jsonget('/solutions?catalogId={}', catalog['catalogId']):
134             for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
135                 onboard(aa, callback, solution, revision['revisionId'])
136
137
138 def onboard(aa, callback, solution, revid):
139
140     xrev = aa.jsonget('/solutions/{}/revisions/{}', solution['solutionId'], revid)
141     callback(model_name=solution['name'], model_version=xrev['version'], model_last_updated=xrev['modified'], rating=solution['ratingAverageTenths'] / 10.0, proto_getter=aa.artgetter(xrev, _x_proto_matcher), zip_getter=aa.artgetter(xrev, _x_zip_matcher), metadata_getter=aa.artgetter(xrev, _md_json_matcher))
142
143
144 def _pullfile(source, dest):
145     with open(dest, 'wb') as f:
146         for chunk in source().iter_content(65536):
147             f.write(chunk)
148
149
150 _loadedformats = set()
151 _loadedcomponents = set()
152
153
154 def scan(config):
155     _walk(config)
156
157
158 def _makecallback(config):
159     workdir = config.tmpdir
160     obauth = config.obauth()
161     oburl = config.oburl
162
163     def callback(model_name, model_version, model_last_updated, rating, proto_getter, zip_getter, metadata_getter):
164         model_name = model_name.lower()
165         model_version = model_version.lower()
166         compid = (model_name, model_version)
167         if compid in _loadedcomponents:
168             print('Skipping component {}: already analyzed'.format(compid))
169             return
170         if proto_getter is None or zip_getter is None or metadata_getter is None:
171             print('Skipping component {}: does not have required artifacts'.format(compid))
172             _loadedcomponents.add(compid)
173             return
174         modeldir = '{}/{}'.format(workdir, model_name)
175         shutil.rmtree(modeldir, True)
176         os.makedirs(modeldir)
177         try:
178             _pullfile(proto_getter, '{}/model.proto'.format(modeldir))
179             _pullfile(zip_getter, '{}/model.zip'.format(modeldir))
180             _pullfile(metadata_getter, '{}/metadata.json'.format(modeldir))
181         except Exception:
182             print('Skipping component {}: artifact access error'.format(compid))
183             _loadedcomponents.add(compid)
184             return
185         try:
186             docker_uri, data_formats, spec = convert.gen_dcae_artifacts_for_model(config, model_name, model_version)
187             shutil.rmtree(modeldir)
188         except Exception:
189             print('Error analyzing artifacts for {}'.format(compid))
190             traceback.print_exc()
191             return
192         for data_format in data_formats:
193             fmtid = (data_format['self']['name'], data_format['self']['version'])
194             if fmtid in _loadedformats:
195                 print('Skipping data format {}: already analyzed'.format(fmtid))
196                 continue
197             try:
198                 resp = requests.post(oburl('/dataformats'), json={'owner': config.dcaeuser, 'spec': data_format}, auth=obauth)
199                 if resp.status_code == 409:
200                     print('Skipping data format {}: previously loaded'.format(fmtid))
201                     _loadedformats.add(fmtid)
202                     continue
203                 resp.raise_for_status()
204                 requests.patch(resp.json()['dataFormatUrl'], json={'owner': config.dcaeuser, 'status': 'published'}, auth=obauth).raise_for_status()
205                 print('Loaded data format {}'.format(fmtid))
206                 _loadedformats.add(fmtid)
207             except Exception:
208                 print('Error loading data format {}'.format(fmtid))
209                 traceback.print_exc()
210                 raise
211         try:
212             resp = requests.post(oburl('/components'), json={'owner': config.dcaeuser, 'spec': spec}, auth=obauth)
213             if resp.status_code == 409:
214                 print('Skipping component {}: previously loaded'.format(compid))
215                 _loadedcomponents.add(compid)
216                 return
217             resp.raise_for_status()
218             requests.patch(resp.json()['componentUrl'], json={'owner': config.dcaeuser, 'status': 'published'}, auth=obauth).raise_for_status()
219             print('Loaded component {}'.format(compid))
220             _loadedcomponents.add(compid)
221         except Exception:
222             print('Error loading component {}'.format(compid))
223             traceback.print_exc()
224             raise
225     return callback
226
227
228 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
229     protocol_version = "HTTP/1.1"
230
231
232 class Apihandler(BaseHTTPRequestHandler):
233     def doqp(self):
234         self.qparams = {}
235         if not self.path or '?' not in self.path:
236             return
237         self.path, qp = self.path.split('?', 1)
238         for x in qp.split('&'):
239             k, v = x.split('=', 1)
240             self.qparams[unquote_plus(k)] = unquote_plus(v)
241
242     def replyjson(self, body, ctype='application/json'):
243         self.replyraw(json.dumps(body).encode('utf-8'), ctype)
244
245     def replyraw(self, data, ctype):
246         self.send_response(200)
247         self.send_header('Content-Type', ctype)
248         self.send_header('Content-Length', len(data))
249         self.end_headers()
250         self.wfile.write(data)
251
252     def do_GET(self):
253         self.doqp()
254         if self.path == '/' or self.path == '/index.html' or self.path == '/acumos-adapter/' or self.path == '/acumos-adapter/index.html':
255             self.replyraw(self.server.index, 'text/html')
256             return
257         if 'acumos' not in self.qparams:
258             self.send_error(400)
259             return
260         aa = _AcumosAccess(self.server.config, self.qparams['acumos'])
261         if self.path == '/acumos-adapter/listCatalogs.js':
262             self.replyjson([{'name': x['name'], 'id': x['catalogId']} for x in aa.jsonget('/catalogs')])
263             return
264         if self.path == '/acumos-adapter/listSolutions.js':
265             if 'catalogId' not in self.qparams:
266                 self.send_error(400)
267                 return
268             self.replyjson([{'name': x['name'], 'id': x['solutionId']} for x in aa.jsonget('/solutions?catalogId={}', self.qparams['catalogId'])])
269             return
270         if self.path == '/acumos-adapter/listRevisions.js':
271             if 'solutionId' not in self.qparams:
272                 self.send_error(400)
273                 return
274             self.replyjson([{'name': x['version'], 'id': x['revisionId']} for x in aa.jsonget('/solutions/{}/revisions', self.qparams['solutionId'])])
275             return
276         self.send_error(404)
277
278     def do_POST(self):
279         self.doqp()
280         if self.path == '/acumos-adapter/onboard.js':
281             if 'acumos' not in self.qparams:
282                 self.send_error(400)
283                 return
284             aa = _AcumosAccess(self.server.config, self.qparams['acumos'])
285             callback = self.server.callback
286             if 'catalogId' not in self.qparams:
287                 for catalog in aa.jsonget('/catalogs'):
288                     for solution in aa.jsonget('/solutions?catalogId={}', catalog['catalogId']):
289                         for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
290                             onboard(aa, callback, solution, revision['revisionId'])
291             elif 'solutionId' not in self.qparams:
292                 for solution in aa.jsonget('/solutions?catalogId={}', self.qparams['catalogId']):
293                     for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
294                         onboard(aa, callback, solution, revision['revisionId'])
295             elif 'revisionId' not in self.qparams:
296                 solution = aa.jsonget('/solutions/{}', self.qparams['solutionId'])
297                 for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
298                     onboard(aa, callback, solution, revision['revisionId'])
299             else:
300                 solution = aa.jsonget('/solutions/{}', self.qparams['solutionId'])
301                 onboard(aa, callback, solution, self.qparams['revisionId'])
302             self.replyraw('OK'.encode('utf-8'), 'text/plain')
303             return
304         self.send_error(400)
305
306
307 def serve(config):
308     server = ThreadedHTTPServer(('', config.port), Apihandler)
309     server.config = config
310     server.callback = _makecallback(config)
311     server.index = resource_string(__name__, 'index.html')
312     server.serve_forever()