import flask import json from flask import request import requests import threading import time app = flask.Flask(__name__) app.config["DEBUG"] = True def get_neighbour_cell_list_for_cell_id(): with open('cell_list.json') as cell_list: data = json.load(cell_list) if not data: return {"Error": "Unable to read file"}, 503 return data, None def get_pci_for_cell_id(): with open('pci_value.json') as pci_value: data = json.load(pci_value) if not data: return {"Error": "Unable to read file"}, 503 return data, None def get_cell_data_for_cell_id(): with open('cell_data.json') as cell_data: data = json.load(cell_data) if not data: return {"Error": "Unable to read file"}, 503 return data, None def get_oof_sync_response(): with open('oof_syn_response.json') as syncRes: data = json.load(syncRes) if not data: return {"Error": "Unale to read file"}, 503 return data, None def get_oof_async_response(callback_url, transaction_id): time.sleep(10) with open('oof_async_response.json') as asyncRes: data = json.load(asyncRes) data['transactionId'] = transaction_id if not data: return {"Error": "Unable to read file"}, 503 res = requests.post(callback_url, json=data) print('response from server:',res.text) return res @app.route("/api/sdnc-config-db/v3/getNbrList//", methods=["GET"]) def get_neighbour_list(cell_id, ts): data, status = get_neighbour_cell_list_for_cell_id() if not status: return data return data, 503 @app.route("/api/sdnc-config-db/v3/getPCI//", methods=["GET"]) def get_pci(cell_id, ts): data, status = get_pci_for_cell_id() if not status: return data return data, 503 @app.route("/api/sdnc-config-db/v3/getPnfId//", methods=["GET"]) def get_pnf_id(cell_id, ts): data, status = get_pci_for_cell_id() data['value'] = 'ncserver5' if not status: return data return data, 503 @app.route("/api/sdnc-config-db/v3/getCell/", methods=["GET"]) def get_cell_data(cell_id): data, status = get_cell_data_for_cell_id() if not status: return data return data, 503 @app.route("/api/oof/v1/pci",methods=["POST"]) def oof_optimizatio_result(): content = request.get_json() callback_url = content['requestInfo']['callbackUrl'] transaction_id = content['requestInfo']['transactionId'] try: task = threading.Thread(target=get_oof_async_response, args=(callback_url,transaction_id,)) task.daemon = True task.start() except: print("Error: Unable to start thread") data, status = get_oof_sync_response() if not status: return data, 202 return data, 503 app.run(host='0.0.0.0') app.run(debug=True)