### # ============LICENSE_START======================================================= # ONAP : ccsdk distribution web # ================================================================================ # Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. # All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= ### import subprocess import os import json import zipfile import re import uuid import urllib3 import shutil import re import ssl urllib3.disable_warnings() APPLICATION_LISTFILE="/app/odlux.application.list" INIT_FOLDER="/app/init.d" ODLUX_BASE_FOLDER='/app/odlux' INDEX_HTML=ODLUX_BASE_FOLDER+'/index.html' INDEX_HTML_TEMPLATE=INDEX_HTML+'.template' DEFAULT_APPLICATIONS=["connectApp" "faultApp" "maintenanceApp" "configurationApp" "performanceHistoryApp" "inventoryApp" "eventLogApp" "mediatorApp" "helpApp"] http = urllib3.PoolManager(cert_reqs=ssl.CERT_NONE) def exec(command): output = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE).stdout.read() return output def execToStdOut(commandArray): process = subprocess.Popen(commandArray, shell=False) process.communicate() def download(url, dst): print("downloading from {}...".format(url),end="") with open(dst, 'wb') as out_file: resp= http.request('GET',url, preload_content=False) shutil.copyfileobj(resp, out_file) resp.release_conn() print("done") def getEnv(key, defaultValue=None): x=os.getenv(key) return x if x is not None and len(x)>0 else defaultValue def sedInFile(old, nu, fn): execToStdOut(['sed', '-i', 's|{}|{}|g'.format(old,nu),fn]) def add_application(name, index, file=None): apps = load_applications() if index==0: print("no index given. put it to last position") index=apps[len(apps)-1]['index']+10 apps.append(dict(index=index,name=name)) if file is not None and os.path.exists(file): extract(file) else: print('unable to find file {}'.format(file)) write_applications(apps) print("{} installed on index {}".format(name, index)) def initial_load(): files = os.listdir(INIT_FOLDER) regex = r"([0-9]+)([a-zA-Z]+)\.(jar|zip)" regexUrl = r"([0-9]+)([a-zA-Z]+)\.(url)" for file in files: matches = re.finditer(regex,file) match = next(matches, None) matchesUrl = re.finditer(regexUrl,file) matchUrl = next(matchesUrl, None) if match is not None: print("installing {}".format(file)) index = int(match.group(1)) name = match.group(2) add_application(name,index,INIT_FOLDER+'/'+file) elif matchUrl is not None: print("installing {}".format(file)) index = int(match.group(1)) name = match.group(2) add_application(name,index,INIT_FOLDER+'/'+file) else: print("no index naming format found. try to autodetect") infos = autoDetectInfosFromJar(file) if infos is None: print("unable to detect index and application name for {}".format(file)) else: add_application(infos['name'],infos['index'],INIT_FOLDER+'/'+file) def containsBlueprintExpression(file) -> bool: print("check if file {} is blueprint".format(file)) with open(file, 'r') as fp: lines = fp.readlines() for line in lines: if "[^<]*<\/script>/