1 # ============LICENSE_START=======================================================
2 # Copyright (C) 2021 Samsung
3 # Modification Copyright (C) 2022 Deutsche Telekom AG
4 # ================================================================================
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # ============LICENSE_END=========================================================
22 from io import BytesIO
24 from onapsdk.aai.business import Customer
25 from onapsdk.cds.blueprint import Workflow, Blueprint
26 from config import Config, VariablesDict
28 #FIXME remove from global scope
29 logger = logging.getLogger("")
30 logger.setLevel(logging.INFO)
31 fh = logging.StreamHandler()
32 fh_formatter = logging.Formatter('%(asctime)s %(levelname)s %(lineno)d:%(filename)s(%(process)d) - %(message)s')
33 fh.setFormatter(fh_formatter)
37 def resolve_hc_inputs(config: Config):
38 logger.info("******** Check Customer *******")
39 customer_id = config.service_instance["customer_id"]
40 customer = Customer.get_by_global_customer_id(customer_id)
42 raise Exception("Customer %s wasn't found in ONAP" % customer_id)
43 logger.info("******** Check Service Subscription *******")
44 service_subscription = None
45 for service_sub in customer.service_subscriptions:
46 logger.debug("Service subscription %s is found", service_sub.service_type)
47 if service_sub.service_type == config.service_model["model_name"]:
48 logger.info("Service %s subscribed", config.service_model["model_name"])
49 service_subscription = service_sub
51 logger.info("******** Retrieve Service Metadata *******")
52 service_instance = None
53 for single_service in service_subscription.service_instances:
54 if single_service.instance_name == config.service_instance["instance_name"]:
55 service_instance = single_service
57 service_id = service_instance.instance_id
58 vnfs = list(service_instance.vnf_instances)
60 raise NotImplementedError("Service %s is composed of more than one vnf!" % service_id)
62 raise Exception("Service %s doesn't contain any vnfs" % service_id)
63 vnf_id = vnfs[0].vnf_id
64 return service_id, vnf_id
66 def main(status_count):
67 mypath = os.path.dirname(os.path.realpath(__file__))
68 config = Config(env_dict=VariablesDict.env_variable)
69 for vnf in config.service_model["vnfs"]:
70 file = vnf["vsp"]["vsp_file"]
71 file_path = os.path.join(mypath, file)
72 with zipfile.ZipFile(file_path, 'r') as package:
73 cba_io = BytesIO(package.read("CBA.zip"))
75 blueprint = Blueprint(cba_io.read())
77 healthcheck: Workflow = blueprint.get_workflow_by_name('health-check')
78 serv_id, vnf_id = resolve_hc_inputs(config)
79 cds_input = {"health-check-properties":
81 "service-instance-id": serv_id,
83 "status-check-max-count": int(status_count)
87 logger.info("Requesting Healthcheck for CBA %s:%s with inputs:\n%s",
88 blueprint.metadata.template_name,
89 blueprint.metadata.template_version,
91 result = healthcheck.execute(cds_input)
92 logger.info("Healthcheck process completed with result: %s", result)
93 logger.info("Please check cds-blueprints-processor logs to see exact status")
95 if __name__ == "__main__":
98 status_count = sys.argv[1]
99 print(f"Status Check Max Count: %s" % status_count)