2 Created on Aug 18, 2017
6 from robot.api import logger
7 from Queue import Queue
21 class DcaeLibrary(object):
27 def setup_dmaap_server(port_num=3904):
28 if DcaeVariables.HttpServerThread is not None:
29 DMaaP.clean_up_event()
30 logger.console("Clean up event from event queue before test")
31 logger.info("DMaaP Server already started")
34 DcaeVariables.IsRobotRun = True
35 DMaaP.test(port=port_num)
37 DcaeVariables.VESEventQ = Queue()
38 DcaeVariables.HttpServerThread = threading.Thread(name='DMAAP_HTTPServer', target=DMaaP.DMaaPHttpd.serve_forever)
39 DcaeVariables.HttpServerThread.start()
40 logger.console("DMaaP Mockup Sever started")
43 except Exception as e:
49 if DcaeVariables.HTTPD is not None:
50 DcaeVariables.HTTPD.shutdown()
51 logger.console("DMaaP Server shut down")
58 def cleanup_ves_events():
59 if DcaeVariables.HttpServerThread is not None:
60 DMaaP.clean_up_event()
61 logger.console("DMaaP event queue is cleaned up")
63 logger.console("DMaaP server not started yet")
67 def enable_vesc_with_certBasicAuth():
69 if 'Windows' in platform.system():
71 client = paramiko.SSHClient()
72 client.load_system_host_keys()
73 # client.set_missing_host_key_policy(paramiko.WarningPolicy)
74 client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
76 client.connect(os.environ['CSIT_IP'], port=22, username=os.environ['CSIT_USER'], password=os.environ['CSIT_PD'])
77 stdin, stdout, stderr = client.exec_command('%{WORKSPACE}/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh')
78 logger.console(stdout.read())
82 ws = os.environ['WORKSPACE']
83 script2run = ws + "/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh"
84 logger.info("Running script: " + script2run)
85 logger.console("Running script: " + script2run)
86 subprocess.call(script2run)
91 def dmaap_message_receive(evtobj, action='contain'):
93 evt_str = DMaaP.deque_event()
94 while evt_str != None:
95 if action == 'contain':
97 logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str)
99 if action == 'sizematch':
100 if len(evtobj) == len(evt_str):
102 if action == 'dictmatch':
103 evt_dict = json.loads(evt_str)
104 if cmp(evtobj, evt_dict) == 0:
106 evt_str = DMaaP.deque_event()
110 def is_json_empty(resp):
111 logger.info("Enter is_json_empty: resp.text: " + resp.text)
112 if resp.text is None or len(resp.text) < 2:
118 """generate a uuid"""
122 def get_json_value_list(jsonstr, keyval):
123 logger.info("Enter Get_Json_Key_Value_List")
124 if jsonstr is None or len(jsonstr) < 2:
125 logger.info("No Json data found")
128 data = json.loads(jsonstr)
131 nodelist.append(item[keyval])
133 except Exception as e:
134 logger.info("Json data parsing fails")
139 def generate_millitimestamp_uuid():
140 """generate a millisecond timestamp uuid"""
141 then = datetime.datetime.now()
142 return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3)
147 from pprint import pprint
149 with open('robot/assets/dcae/ves_volte_single_fault_event.json') as data_file:
150 data = json.load(data_file)
152 data['event']['commonEventHeader']['version'] = '5.0'
156 if __name__ == '__main__':
158 dictStr = "action=getTable,Accept=application/json,Content-Type=application/json,X-FromAppId=1234908903284"
160 #dict = cls.create_header_from_string(dictStr)
162 jsonStr = "[{'Node': 'onapfcnsl00', 'CheckID': 'serfHealth', 'Name': 'Serf Health Status', 'ServiceName': '', 'Notes': '', 'ModifyIndex': 6, 'Status': 'passing', 'ServiceID': '', 'ServiceTags': [], 'Output': 'Agent alive and reachable', 'CreateIndex': 6}]"
163 lsObj = cls.get_json_value_list(jsonStr, 'Status')
168 lib.enable_vesc_https_auth()
170 ret = lib.setup_dmaap_server()