Bbs test cases are not run while bbs code is merged
[integration/csit.git] / tests / dcaegen2 / bbs-testcases / resources / BbsLibrary.py
diff --git a/tests/dcaegen2/bbs-testcases/resources/BbsLibrary.py b/tests/dcaegen2/bbs-testcases/resources/BbsLibrary.py
deleted file mode 100644 (file)
index 8dbdc5a..0000000
+++ /dev/null
@@ -1,173 +0,0 @@
-import json
-
-import docker
-import time
-from docker.utils.json_stream import json_stream
-from collections import OrderedDict
-
-
-class BbsLibrary(object):
-
-    def __init__(self):
-        pass
-
-    @staticmethod
-    def check_for_log(search_for):
-        client = docker.from_env()
-        container = client.containers.get('bbs')
-
-        alog = container.logs(stream=False, tail=1000)
-        try:
-            alog = alog.decode()
-        except AttributeError:
-            pass
-
-        found = alog.find(search_for)
-        if found != -1:
-            return True
-        else:
-            return False
-
-    @staticmethod
-    def create_pnf_name_from_auth(json_file):
-        json_to_python = json.loads(json_file)
-        correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName")
-        return correlation_id
-
-    @staticmethod
-    def get_invalid_auth_elements(json_file):
-        """
-        Get the correlationId, oldState, newState, stateInterface, macAddress, swVersion elements
-        from the invalid message and place the elements into a JSON object (string) as fields for comparision
-        """
-        json_to_python = json.loads(json_file)
-        correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName")
-        oldState = json_to_python.get("event").get("stateChangeFields").get("oldState")
-        newState = json_to_python.get("event").get("stateChangeFields").get("newState")
-        stateInterface = json_to_python.get("event").get("stateChangeFields").get("stateInterface")
-        macAddress = json_to_python.get("event").get("stateChangeFields").get("additionalFields").get("macAddress")
-        swVersion = json_to_python.get("event").get("stateChangeFields").get("additionalFields").get("swVersion")
-        if swVersion is None:
-            swVersion = ""
-        
-        inv_fields = OrderedDict()
-
-        #inv_fields = dict()
-        inv_fields['correlationId'] = correlation_id
-        inv_fields['oldState'] = oldState
-        inv_fields['newState'] = newState
-        inv_fields['stateInterface'] = stateInterface
-        inv_fields['macAddress'] = macAddress
-        inv_fields['swVersion'] = swVersion
-        
-        # Transform the dictionary to JSON string
-        json_str = json.dumps(inv_fields)
-        
-        # Need to remove spaces between elements
-        json_str = json_str.replace(', ', ',')
-        return json_str
-
-    @staticmethod
-    def get_invalid_update_elements(json_file):
-        """
-        Get the correlationId, attachment-point, remote-id, cvlan, svlan, elements
-        from the invalid message and place the elements into a JSON object (string) as fields for comparision
-        """
-        json_to_python = json.loads(json_file)
-        correlation_id = json_to_python.get("correlationId")
-        attachmentPoint = json_to_python.get("additionalFields").get("attachment-point")
-        remoteId = json_to_python.get("additionalFields").get("remote-id")
-        cvlan = json_to_python.get("additionalFields").get("cvlan")
-        svlan = json_to_python.get("additionalFields").get("svlan")
-        
-        inv_fields = OrderedDict()
-        #inv_fields = dict()
-        inv_fields['correlationId'] = correlation_id
-        inv_fields['attachment-point'] = attachmentPoint
-        inv_fields['remote-id'] = remoteId
-        inv_fields['cvlan'] = cvlan
-        inv_fields['svlan'] = svlan
-        
-        # Transform the dictionary to JSON string
-        json_str = json.dumps(inv_fields)
-        
-        # Need to remove spaces between elements
-        json_str = json_str.replace(', ', ',')
-        return json_str
-
-    @staticmethod
-    def compare_policy(dmaap_policy, json_policy):
-        resp = False
-        try:
-            python_policy = json.loads(json_policy).pop()
-        except:
-            python_policy = ""
-        
-        try:
-            python_dmaap_policy = json.loads(dmaap_policy)
-        except:
-            python_dmaap_policy = ""
-
-        try:
-            d_policy = python_dmaap_policy.get("policyName")
-        except:
-            d_policy = ""
-
-        try:
-            j_policy = python_policy.get("policyName")
-        except:
-            return "False"
-        
-        resp = "False"
-        if (d_policy == j_policy):
-            resp = "True"
-        return resp
-
-    @staticmethod
-    def create_pnf_name_from_update(json_file):
-        json_to_python = json.loads(json_file)
-        correlation_id = json_to_python.get("correlationId")
-        return correlation_id
-
-    @staticmethod
-    def ensure_container_is_running(name):
-        
-        client = docker.from_env()
-
-        if not BbsLibrary.is_in_status(client, name, "running"):
-            print ("starting container", name)
-            container = client.containers.get(name)
-            container.start()
-            BbsLibrary.wait_for_status(client, name, "running")
-
-        BbsLibrary.print_status(client)
-
-    @staticmethod
-    def ensure_container_is_exited(name):
-
-        client = docker.from_env()
-
-        if not BbsLibrary.is_in_status(client, name, "exited"):
-            print ("stopping container", name)
-            container = client.containers.get(name)
-            container.stop()
-            BbsLibrary.wait_for_status(client, name, "exited")
-
-        BbsLibrary.print_status(client)
-
-    @staticmethod
-    def print_status(client):
-        print("containers status")
-        for c in client.containers.list(all=True):
-            print(c.name, "   ", c.status)
-
-    @staticmethod
-    def wait_for_status(client, name, status):
-        while not BbsLibrary.is_in_status(client, name, status):
-            print ("waiting for container: ", name, "to be in status: ", status)
-            time.sleep(3)
-
-    @staticmethod
-    def is_in_status(client, name, status):
-        return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
-