X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=tests%2Fdcaegen2%2Fprh-testcases%2Fresources%2FPrhLibrary.py;h=88aeb45c3c47fc3da697014e08800d96588c5baa;hb=8a0cfc9b928b57d1e950774d6a7e4ad8d9a81fac;hp=d413be58c8dca2b6138d17fcd8a6c9182d4a366b;hpb=de57b6bcc922d933af34f7b3088c9b7a02129a16;p=integration%2Fcsit.git diff --git a/tests/dcaegen2/prh-testcases/resources/PrhLibrary.py b/tests/dcaegen2/prh-testcases/resources/PrhLibrary.py index d413be58..88aeb45c 100644 --- a/tests/dcaegen2/prh-testcases/resources/PrhLibrary.py +++ b/tests/dcaegen2/prh-testcases/resources/PrhLibrary.py @@ -10,35 +10,83 @@ class PrhLibrary(object): pass @staticmethod - def check_for_log(search_for): + def find_log_entry(search_for): + print (type(search_for)) client = docker.from_env() container = client.containers.get('prh') + print ("Check for log searches for pattern: ", search_for ) for line in container.logs(stream=True): + print ("Check for log analysis line: ", line ) if search_for in line.strip(): return True else: return False @staticmethod - def create_pnf_ready_notification(json_file): + def create_invalid_notification(json_file): + event = json.loads(json_file)[0] + correlation_id = PrhLibrary.extract_correlation_id_value(event, "correlationId") + ipv4 = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "oamV4IpAddress", "oamV4IpAddress") + ipv6 = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "oamV6IpAddress", "oamV6IpAddress") + serial_number = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "serialNumber", "serialNumber") + vendor_name = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "vendorName", "vendorName") + model_number = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "modelNumber", "modelNumber") + unit_type = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "unitType", "unitType") + + additional_fields = PrhLibrary.extract_additional_fields(event) + + str_json = '{' + correlation_id + ipv4 + ipv6 + serial_number + vendor_name + model_number + unit_type + '"nfNamingCode":""' + "," + '"softwareVersion":"",' + additional_fields + return json.dumps(str_json).replace("\\", "")[1:-1].replace("\":", "\": ").rstrip(',') + '\\n}' + + @staticmethod + def create_pnf_ready_notification_as_pnf_ready(json_file): json_to_python = json.loads(json_file) - ipv4 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV4IpAddress") - ipv6 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV6IpAddress") if "oamV6IpAddress" in json_to_python["event"]["pnfRegistrationFields"] else "" - serial_number = json_to_python.get("event").get("pnfRegistrationFields").get("serial-number") if "serial-number" in json_to_python["event"]["pnfRegistrationFields"] else "" - equip_vendor = json_to_python.get("event").get("pnfRegistrationFields").get("equip-vendor") if "equip-vendor" in json_to_python["event"]["pnfRegistrationFields"] else "" - equip_model = json_to_python.get("event").get("pnfRegistrationFields").get("equip-model") if "equip-model" in json_to_python["event"]["pnfRegistrationFields"] else "" - equip_type = json_to_python.get("event").get("pnfRegistrationFields").get("equip-type") if "equip-type" in json_to_python["event"]["pnfRegistrationFields"] else "" - nf_role = json_to_python.get("event").get("pnfRegistrationFields").get("nf-role") if "nf-role" in json_to_python["event"]["pnfRegistrationFields"] else "" - sw_version = json_to_python.get("event").get("pnfRegistrationFields").get("sw-version") if "sw-version" in json_to_python["event"]["pnfRegistrationFields"] else "" - correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") - str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serial-number":"' + serial_number + '","equip-vendor":"' + equip_vendor + '","equip-model":"' + equip_model + '","equip-type":"' + equip_type + '","nf-role":"' + nf_role + '","sw-version":"' + sw_version + '"}' - python_to_json = json.dumps(str_json) - return python_to_json.replace("\\", "")[1:-1] + correlation_id = PrhLibrary.extract_correlation_id_value(json_to_python, "correlationId") + + additional_fields = PrhLibrary.extract_additional_fields_value(json_to_python) + + str_json = '{' + correlation_id + additional_fields + + return json.dumps(str_json.rstrip(',') + '}').replace("\\", "")[1:-1] + + @staticmethod + def extract_additional_fields_value(content): + fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content) + if len(fields) == 0: + return "" + return PrhLibrary.build_additional_fields_json(fields) + + @staticmethod + def extract_additional_fields(content): + fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content) + if fields == []: + return '"additionalFields":null' + return PrhLibrary.build_additional_fields_json(fields) + + @staticmethod + def get_additional_fields_as_key_value_pairs(content): + return content.get("event").get("pnfRegistrationFields").get( + "additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else [] + + @staticmethod + def build_additional_fields_json(fields): + res = '"additionalFields":{' + for f in fields: + res += '"' + f + '":"' + fields.get(f) + '",' + return res.rstrip(',') + '},' + + @staticmethod + def extract_value_from_pnfRegistrationFields(content, name, key): + return '"' + name + '":"' + (content.get("event").get("pnfRegistrationFields").get(key) + '",' if key in content["event"]["pnfRegistrationFields"] else '",') + + @staticmethod + def extract_correlation_id_value(content, name): + return '"' + name + '":"' + (content.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in content["event"]["commonEventHeader"] else '",') @staticmethod def create_pnf_name(json_file): json_to_python = json.loads(json_file) - correlation_id = json_to_python.get("sourceName") + correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",' return correlation_id @staticmethod @@ -79,10 +127,4 @@ class PrhLibrary(object): @staticmethod def is_in_status(client, name, status): - return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1 - - - def create_invalid_notification(self, json_file): - return self.create_pnf_ready_notification(json_file).replace("\":", "\": ")\ - .replace("ipaddress-v4-oam", "oamV4IpAddress").replace("ipaddress-v6-oam", "oamV6IpAddress")\ - .replace("}", "\\n}") + return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1 \ No newline at end of file