import json
-
+import re
import docker
import time
+import datetime
class PrhLibrary(object):
+ ROBOT_LIBRARY_SCOPE = 'TEST SUITE'
+ ROBOT_LISTENER_API_VERSION = 2
def __init__(self):
- pass
+ self.ROBOT_LIBRARY_LISTENER = self
+
+ def _start_test(self, name, attrs):
+ # http://robotframework.org/robotframework/latest/RobotFrameworkUserGuide.html#test-libraries-as-listeners
+ self.test_start_time = self.get_current_utc_datetime()
@staticmethod
- def check_for_log(search_for):
- client = docker.from_env()
- container = client.containers.get('prh')
- for line in container.logs(stream=True):
- if search_for in line.strip():
+ def get_current_utc_datetime():
+ return datetime.datetime.utcnow()
+
+ def get_docker_logs_since_test_start(self, container_id):
+ return self.get_docker_logs(container_id, self.test_start_time)
+
+ @staticmethod
+ def get_docker_logs(container_id, since=None):
+ container = PrhLibrary.__get_docker_container(container_id)
+ return container.logs(stream=False, since=since)
+
+ def wait_for_one_of_docker_log_entries(self, container_id, searched_entries):
+ print("Looking for: %s" % searched_entries)
+ container = PrhLibrary.__get_docker_container(container_id)
+ print("Log lines:")
+ for line in container.logs(stream=True, since=self.test_start_time):
+ print(line)
+ for searched_entry in searched_entries:
+ if searched_entry in line.decode(encoding='utf-8').strip():
+ return True
+ else:
+ return False
+
+ def wait_for_log_entry_with_json_message(self, prefix, json_message):
+ print("Looking for:")
+ print("Prefix: " + str(prefix))
+ print("Json: " + str(json_message))
+ try:
+ decoded_message = json.loads(json_message)
+ except json.JSONDecodeError:
+ print("Could not decode given message")
+ return False
+ pattern = re.compile(prefix + "(.*)$")
+ container = PrhLibrary.__get_docker_container('prh')
+ print("Log lines:")
+ for line in container.logs(stream=True, since=self.test_start_time):
+ print(line)
+ if PrhLibrary.__same_json_in_log(decoded_message, line, pattern):
return True
else:
return False
@staticmethod
- def create_pnf_ready_notification(json_file):
- json_to_python = json.loads(json_file)
- ipv4 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV4IpAddress")
- ipv6 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV6IpAddress") if "oamV6IpAddress" in json_to_python["event"]["pnfRegistrationFields"] else ""
- serial_number = json_to_python.get("event").get("pnfRegistrationFields").get("serial-number") if "serial-number" in json_to_python["event"]["pnfRegistrationFields"] else ""
- equip_vendor = json_to_python.get("event").get("pnfRegistrationFields").get("equip-vendor") if "equip-vendor" in json_to_python["event"]["pnfRegistrationFields"] else ""
- equip_model = json_to_python.get("event").get("pnfRegistrationFields").get("equip-model") if "equip-model" in json_to_python["event"]["pnfRegistrationFields"] else ""
- equip_type = json_to_python.get("event").get("pnfRegistrationFields").get("equip-type") if "equip-type" in json_to_python["event"]["pnfRegistrationFields"] else ""
- nf_role = json_to_python.get("event").get("pnfRegistrationFields").get("nf-role") if "nf-role" in json_to_python["event"]["pnfRegistrationFields"] else ""
- sw_version = json_to_python.get("event").get("pnfRegistrationFields").get("sw-version") if "sw-version" in json_to_python["event"]["pnfRegistrationFields"] else ""
- correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName")
- str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serial-number":"' + serial_number + '","equip-vendor":"' + equip_vendor + '","equip-model":"' + equip_model + '","equip-type":"' + equip_type + '","nf-role":"' + nf_role + '","sw-version":"' + sw_version + '"}'
- python_to_json = json.dumps(str_json)
- return python_to_json.replace("\\", "")[1:-1]
+ def create_invalid_notification(json_file):
+ output = {}
+ input = json.loads(json_file)
+ output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input)
+ output["oamV4IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV4IpAddress")
+ output["oamV6IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV6IpAddress")
+ output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber")
+ output["vendorName"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName")
+ output["modelNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber")
+ output["unitType"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType")
+ output['nfNamingCode'] = ''
+ output['softwareVersion'] = ''
+
+ output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input)
+
+ return json.dumps(output)
@staticmethod
- def create_pnf_name(json_file):
- json_to_python = json.loads(json_file)
- correlation_id = json_to_python.get("sourceName")
- return correlation_id
+ def create_pnf_ready_notification_as_pnf_ready(json_file):
+ output = {}
+ input = json.loads(json_file)[0]
+
+ output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input)
+ output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber")
+ output["equip-vendor"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName")
+ output["equip-model"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber")
+ output["equip-type"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType")
+ output["nf-role"] = PrhLibrary.__extract_nf_role(input)
+ output["sw-version"] = ""
+
+ output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input)
+
+ return json.dumps(output)
@staticmethod
def ensure_container_is_running(name):
def is_in_status(client, name, status):
return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
+ @staticmethod
+ def create_pnf_name(json_file):
+ json_to_python = json.loads(json_file)
+ correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",'
+ return correlation_id
+
+ @staticmethod
+ def __get_additional_fields_as_key_value_pairs(content):
+ return content.get("event").get("pnfRegistrationFields").get(
+ "additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else {}
+
+ @staticmethod
+ def __extract_value_from_pnfRegistrationFields(content, key):
+ return content["event"]["pnfRegistrationFields"][key] if key in content["event"]["pnfRegistrationFields"] else ''
+
+ @staticmethod
+ def __extract_correlation_id_value(content):
+ return content["event"]["commonEventHeader"]["sourceName"] if "sourceName" in content["event"]["commonEventHeader"] else ''
+
+ @staticmethod
+ def __extract_nf_role(content):
+ return content["event"]["commonEventHeader"]["nfNamingCode"] if "nfNamingCode" in content["event"]["commonEventHeader"] else ''
- def create_invalid_notification(self, json_file):
- return self.create_pnf_ready_notification(json_file).replace("\":", "\": ")\
- .replace("ipaddress-v4-oam", "oamV4IpAddress").replace("ipaddress-v6-oam", "oamV6IpAddress")\
- .replace("}", "\\n}")
+ @staticmethod
+ def __same_json_in_log(decoded_message, line, pattern):
+ extracted_json = PrhLibrary.__extract_json(line, pattern)
+ if extracted_json is not None:
+ print("Found json: " + extracted_json)
+ try:
+ if json.loads(extracted_json) == decoded_message:
+ return True
+ except json.JSONDecodeError:
+ print("Could not decode")
+ return False
+
+ @staticmethod
+ def __extract_json(line, pattern):
+ full_message = PrhLibrary.__extract_full_message_from_line(line)
+ if full_message is not None:
+ match = pattern.match(full_message.decode(encoding='utf-8'))
+ if match:
+ return match.group(1).replace("\\n", "\n").replace("\\t", "\t")
+ return None
+
+ @staticmethod
+ def __extract_full_message_from_line(line):
+ split = line.split(bytes('|', 'utf-8'))
+ if len(split) > 3:
+ return split[3]
+ return None
+
+ @staticmethod
+ def __get_docker_container(container_id):
+ docker_client = docker.from_env()
+ return docker_client.containers.get(container_id)