1 # ============LICENSE_START====================================================
3 # =============================================================================
4 # Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
5 # =============================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END======================================================
21 from pkg_resources import resource_string
25 from urllib.parse import unquote_plus
26 from socketserver import ThreadingMixIn
27 from http.server import BaseHTTPRequestHandler, HTTPServer
29 from urllib import unquote_plus
30 from SocketServer import ThreadingMixIn
31 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
35 from aoconversion import convert
38 def _derefconfig(value):
39 if value.startswith('@'):
40 with open(value[1:], 'r') as f:
41 return f.readline().strip()
47 Configuration parameters as attributes, make sure the required ones are there,
50 def __init__(self, dcaeurl, dcaeuser, onboardingurl, onboardinguser, onboardingpass, certfile, dockerregistry, dockeruser, dockerpass, acumosurl=None, interval=900, dockerhost='tcp://localhost:2375', tmpdir='/var/tmp/aoadapter', certverify=True, catalogs=None, port=None, **extras):
51 self.dcaeurl = dcaeurl
52 self.dcaeuser = dcaeuser
54 def x(fmt, *args, **kwargs):
55 return onboardingurl + fmt.format(*args, **kwargs)
57 self._onboardingpass = onboardingpass
58 self._onboardinguser = onboardinguser
59 self.acumosurl = acumosurl
60 self.certfile = certfile
61 self.certverify = certverify
62 self.dockerhost = dockerhost
63 self.dockerregistry = dockerregistry
64 self.dockeruser = dockeruser
65 self._dockerpass = dockerpass
66 self.interval = interval
68 if catalogs is not None and type(catalogs) is not list:
70 self.catalogs = catalogs
74 return (self._onboardinguser, _derefconfig(self._onboardingpass))
77 return _derefconfig(self._dockerpass)
80 class _AcumosAccess(object):
81 def __init__(self, config, url):
82 self.cert = config.certfile
83 self.verify = config.certverify
84 self.url = url.strip().rstrip('/')
86 def artgetter(self, xrev, matcher):
87 for art in xrev['artifacts']:
90 nurl = '{}/artifacts/{}/content'.format(self.url, art['artifactId'])
91 resp = requests.get(nurl, stream=True, cert=self.cert, verify=self.verify)
92 if resp.status_code == 500:
93 resp = requests.get(nurl, stream=True, cert=self.cert, verify=self.verify)
94 resp.raise_for_status()
99 def jsonget(self, format, *args, **kwargs):
100 nurl = self.url + format.format(*args, **kwargs)
101 resp = requests.get(nurl, cert=self.cert, verify=self.verify)
102 if resp.status_code == 500:
103 resp = requests.get(nurl, cert=self.cert, verify=self.verify)
104 resp.raise_for_status()
105 return resp.json()['content']
108 def _x_proto_matcher(art):
109 """ Is this artifact the x.proto file? """
110 return art['name'].endswith('.proto')
113 def _x_zip_matcher(art):
114 """ Is this artifact the x.zip file? """
115 return art['name'].endswith('.zip')
118 def _md_json_matcher(art):
119 """ Is this artifact the metadata.json file? """
120 return art['name'].endswith('.json')
125 Walk the Federation E5 interface of an Acumos instance
127 url = config.acumosurl
128 callback = _makecallback(config)
129 catalogs = config.catalogs
130 aa = _AcumosAccess(config, url)
131 for catalog in aa.jsonget('/catalogs'):
132 if catalogs is not None and catalog['catalogId'] not in catalogs and catalog['name'] not in catalogs:
134 for solution in aa.jsonget('/solutions?catalogId={}', catalog['catalogId']):
135 for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
136 onboard(aa, callback, solution, revision['revisionId'])
139 def onboard(aa, callback, solution, revid):
140 xrev = aa.jsonget('/solutions/{}/revisions/{}', solution['solutionId'], revid)
141 callback(model_name=solution['name'], model_version=xrev['version'], model_last_updated=xrev['modified'], rating=solution['ratingAverageTenths'] / 10.0, proto_getter=aa.artgetter(xrev, _x_proto_matcher), zip_getter=aa.artgetter(xrev, _x_zip_matcher), metadata_getter=aa.artgetter(xrev, _md_json_matcher))
144 def _pullfile(source, dest):
145 with open(dest, 'wb') as f:
146 for chunk in source().iter_content(65536):
150 _loadedformats = set()
151 _loadedcomponents = set()
158 def _makecallback(config):
159 workdir = config.tmpdir
160 obauth = config.obauth()
163 def callback(model_name, model_version, model_last_updated, rating, proto_getter, zip_getter, metadata_getter):
164 model_name = model_name.lower()
165 model_version = model_version.lower()
166 compid = (model_name, model_version)
167 if compid in _loadedcomponents:
168 print('Skipping component {}: already analyzed'.format(compid))
170 if proto_getter is None or zip_getter is None or metadata_getter is None:
171 print('Skipping component {}: does not have required artifacts'.format(compid))
172 _loadedcomponents.add(compid)
174 modeldir = '{}/{}'.format(workdir, model_name)
175 shutil.rmtree(modeldir, True)
176 os.makedirs(modeldir)
178 _pullfile(proto_getter, '{}/model.proto'.format(modeldir))
179 _pullfile(zip_getter, '{}/model.zip'.format(modeldir))
180 _pullfile(metadata_getter, '{}/metadata.json'.format(modeldir))
182 print('Skipping component {}: artifact access error'.format(compid))
183 _loadedcomponents.add(compid)
186 docker_uri, data_formats, spec = convert.gen_dcae_artifacts_for_model(config, model_name, model_version)
187 shutil.rmtree(modeldir)
189 print('Error analyzing artifacts for {}'.format(compid))
190 traceback.print_exc()
192 for data_format in data_formats:
193 fmtid = (data_format['self']['name'], data_format['self']['version'])
194 if fmtid in _loadedformats:
195 print('Skipping data format {}: already analyzed'.format(fmtid))
198 resp = requests.post(oburl('/dataformats'), json={'owner': config.dcaeuser, 'spec': data_format}, auth=obauth)
199 if resp.status_code == 409:
200 print('Skipping data format {}: previously loaded'.format(fmtid))
201 _loadedformats.add(fmtid)
203 resp.raise_for_status()
204 requests.patch(resp.json()['dataFormatUrl'], json={'owner': config.dcaeuser, 'status': 'published'}, auth=obauth).raise_for_status()
205 print('Loaded data format {}'.format(fmtid))
206 _loadedformats.add(fmtid)
208 print('Error loading data format {}'.format(fmtid))
209 traceback.print_exc()
212 resp = requests.post(oburl('/components'), json={'owner': config.dcaeuser, 'spec': spec}, auth=obauth)
213 if resp.status_code == 409:
214 print('Skipping component {}: previously loaded'.format(compid))
215 _loadedcomponents.add(compid)
217 resp.raise_for_status()
218 requests.patch(resp.json()['componentUrl'], json={'owner': config.dcaeuser, 'status': 'published'}, auth=obauth).raise_for_status()
219 print('Loaded component {}'.format(compid))
220 _loadedcomponents.add(compid)
222 print('Error loading component {}'.format(compid))
223 traceback.print_exc()
228 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
229 protocol_version = "HTTP/1.1"
232 class Apihandler(BaseHTTPRequestHandler):
235 if not self.path or '?' not in self.path:
237 self.path, qp = self.path.split('?', 1)
238 for x in qp.split('&'):
239 k, v = x.split('=', 1)
240 self.qparams[unquote_plus(k)] = unquote_plus(v)
242 def replyjson(self, body, ctype='application/json'):
243 self.replyraw(json.dumps(body).encode('utf-8'), ctype)
245 def replyraw(self, data, ctype):
246 self.send_response(200)
247 self.send_header('Content-Type', ctype)
248 self.send_header('Content-Length', len(data))
250 self.wfile.write(data)
254 if self.path == '/' or self.path == '/index.html' or self.path == '/acumos-adapter/' or self.path == '/acumos-adapter/index.html':
255 self.replyraw(self.server.index, 'text/html')
257 if 'acumos' not in self.qparams:
260 aa = _AcumosAccess(self.server.config, self.qparams['acumos'])
261 if self.path == '/acumos-adapter/listCatalogs.js':
262 self.replyjson([{'name': x['name'], 'id': x['catalogId']} for x in aa.jsonget('/catalogs')])
264 if self.path == '/acumos-adapter/listSolutions.js':
265 if 'catalogId' not in self.qparams:
268 self.replyjson([{'name': x['name'], 'id': x['solutionId']} for x in aa.jsonget('/solutions?catalogId={}', self.qparams['catalogId'])])
270 if self.path == '/acumos-adapter/listRevisions.js':
271 if 'solutionId' not in self.qparams:
274 self.replyjson([{'name': x['version'], 'id': x['revisionId']} for x in aa.jsonget('/solutions/{}/revisions', self.qparams['solutionId'])])
280 if self.path == '/acumos-adapter/onboard.js':
281 if 'acumos' not in self.qparams:
284 aa = _AcumosAccess(self.server.config, self.qparams['acumos'])
285 callback = self.server.callback
286 if 'catalogId' not in self.qparams:
287 for catalog in aa.jsonget('/catalogs'):
288 for solution in aa.jsonget('/solutions?catalogId={}', catalog['catalogId']):
289 for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
290 onboard(aa, callback, solution, revision['revisionId'])
291 elif 'solutionId' not in self.qparams:
292 for solution in aa.jsonget('/solutions?catalogId={}', self.qparams['catalogId']):
293 for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
294 onboard(aa, callback, solution, revision['revisionId'])
295 elif 'revisionId' not in self.qparams:
296 solution = aa.jsonget('/solutions/{}', self.qparams['solutionId'])
297 for revision in aa.jsonget('/solutions/{}/revisions', solution['solutionId']):
298 onboard(aa, callback, solution, revision['revisionId'])
300 solution = aa.jsonget('/solutions/{}', self.qparams['solutionId'])
301 onboard(aa, callback, solution, self.qparams['revisionId'])
302 self.replyraw('OK'.encode('utf-8'), 'text/plain')
308 server = ThreadedHTTPServer(('', config.port), Apihandler)
309 server.config = config
310 server.callback = _makecallback(config)
311 server.index = resource_string(__name__, 'index.html')
312 server.serve_forever()