+++ /dev/null
-import json
-
-import docker
-import time
-from docker.utils.json_stream import json_stream
-from collections import OrderedDict
-
-
-class BbsLibrary(object):
-
- def __init__(self):
- pass
-
- @staticmethod
- def check_for_log(search_for):
- client = docker.from_env()
- container = client.containers.get('bbs')
-
- alog = container.logs(stream=False, tail=1000)
- try:
- alog = alog.decode()
- except AttributeError:
- pass
-
- found = alog.find(search_for)
- if found != -1:
- return True
- else:
- return False
-
- @staticmethod
- def create_pnf_name_from_auth(json_file):
- json_to_python = json.loads(json_file)
- correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName")
- return correlation_id
-
- @staticmethod
- def get_invalid_auth_elements(json_file):
- """
- Get the correlationId, oldState, newState, stateInterface, macAddress, swVersion elements
- from the invalid message and place the elements into a JSON object (string) as fields for comparision
- """
- json_to_python = json.loads(json_file)
- correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName")
- oldState = json_to_python.get("event").get("stateChangeFields").get("oldState")
- newState = json_to_python.get("event").get("stateChangeFields").get("newState")
- stateInterface = json_to_python.get("event").get("stateChangeFields").get("stateInterface")
- macAddress = json_to_python.get("event").get("stateChangeFields").get("additionalFields").get("macAddress")
- swVersion = json_to_python.get("event").get("stateChangeFields").get("additionalFields").get("swVersion")
- if swVersion is None:
- swVersion = ""
-
- inv_fields = OrderedDict()
-
- #inv_fields = dict()
- inv_fields['correlationId'] = correlation_id
- inv_fields['oldState'] = oldState
- inv_fields['newState'] = newState
- inv_fields['stateInterface'] = stateInterface
- inv_fields['macAddress'] = macAddress
- inv_fields['swVersion'] = swVersion
-
- # Transform the dictionary to JSON string
- json_str = json.dumps(inv_fields)
-
- # Need to remove spaces between elements
- json_str = json_str.replace(', ', ',')
- return json_str
-
- @staticmethod
- def get_invalid_update_elements(json_file):
- """
- Get the correlationId, attachment-point, remote-id, cvlan, svlan, elements
- from the invalid message and place the elements into a JSON object (string) as fields for comparision
- """
- json_to_python = json.loads(json_file)
- correlation_id = json_to_python.get("correlationId")
- attachmentPoint = json_to_python.get("additionalFields").get("attachment-point")
- remoteId = json_to_python.get("additionalFields").get("remote-id")
- cvlan = json_to_python.get("additionalFields").get("cvlan")
- svlan = json_to_python.get("additionalFields").get("svlan")
-
- inv_fields = OrderedDict()
- #inv_fields = dict()
- inv_fields['correlationId'] = correlation_id
- inv_fields['attachment-point'] = attachmentPoint
- inv_fields['remote-id'] = remoteId
- inv_fields['cvlan'] = cvlan
- inv_fields['svlan'] = svlan
-
- # Transform the dictionary to JSON string
- json_str = json.dumps(inv_fields)
-
- # Need to remove spaces between elements
- json_str = json_str.replace(', ', ',')
- return json_str
-
- @staticmethod
- def compare_policy(dmaap_policy, json_policy):
- resp = False
- try:
- python_policy = json.loads(json_policy).pop()
- except:
- python_policy = ""
-
- try:
- python_dmaap_policy = json.loads(dmaap_policy)
- except:
- python_dmaap_policy = ""
-
- try:
- d_policy = python_dmaap_policy.get("policyName")
- except:
- d_policy = ""
-
- try:
- j_policy = python_policy.get("policyName")
- except:
- return "False"
-
- resp = "False"
- if (d_policy == j_policy):
- resp = "True"
- return resp
-
- @staticmethod
- def create_pnf_name_from_update(json_file):
- json_to_python = json.loads(json_file)
- correlation_id = json_to_python.get("correlationId")
- return correlation_id
-
- @staticmethod
- def ensure_container_is_running(name):
-
- client = docker.from_env()
-
- if not BbsLibrary.is_in_status(client, name, "running"):
- print ("starting container", name)
- container = client.containers.get(name)
- container.start()
- BbsLibrary.wait_for_status(client, name, "running")
-
- BbsLibrary.print_status(client)
-
- @staticmethod
- def ensure_container_is_exited(name):
-
- client = docker.from_env()
-
- if not BbsLibrary.is_in_status(client, name, "exited"):
- print ("stopping container", name)
- container = client.containers.get(name)
- container.stop()
- BbsLibrary.wait_for_status(client, name, "exited")
-
- BbsLibrary.print_status(client)
-
- @staticmethod
- def print_status(client):
- print("containers status")
- for c in client.containers.list(all=True):
- print(c.name, " ", c.status)
-
- @staticmethod
- def wait_for_status(client, name, status):
- while not BbsLibrary.is_in_status(client, name, status):
- print ("waiting for container: ", name, "to be in status: ", status)
- time.sleep(3)
-
- @staticmethod
- def is_in_status(client, name, status):
- return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
-