2 Created on Aug 18, 2017
6 from robot.api import logger
17 class DcaeLibrary(object):
23 def override_collector_properties(properties_path):
25 if 'Windows' in platform.system():
27 DcaeLibrary.change_properties_for_windows_platform_system(properties_path)
31 DcaeLibrary.change_properties_for_non_windows_platform_system(properties_path)
35 def change_properties_for_non_windows_platform_system(properties_path):
36 ws = os.environ['WORKSPACE']
37 script2run = ws + '/tests/dcaegen2/testcases/resources/override_collector_properties.sh'
38 logger.info("Running script: " + script2run)
39 logger.console("Running script: " + script2run)
40 subprocess.call([script2run, properties_path])
44 def change_properties_for_windows_platform_system(properties_path):
46 client = paramiko.SSHClient()
47 client.load_system_host_keys()
48 client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
49 client.connect(os.environ['CSIT_IP'], port=22, username=os.environ['CSIT_USER'], password=os.environ['CSIT_PD'])
50 stdin, stdout, stderr = client.exec_command(
51 '%{WORKSPACE}' + '/tests/dcaegen2/testcases/resources/override_collector_properties.sh', properties_path)
52 logger.console(stdout.read())
55 def is_json_empty(resp):
56 logger.info("Enter is_json_empty: resp.text: " + resp.text)
57 if resp.text is None or len(resp.text) < 2:
67 def get_json_value_list(jsonstr, keyval):
68 logger.info("Enter Get_Json_Key_Value_List")
69 if jsonstr is None or len(jsonstr) < 2:
70 logger.info("No Json data found")
73 return DcaeLibrary.extract_list_of_items_from_json_string(jsonstr, keyval)
74 except Exception as e:
75 logger.info("Json data parsing fails")
80 def extract_list_of_items_from_json_string(jsonstr, keyval):
81 data = json.loads(jsonstr)
84 nodelist.append(item[keyval])
88 def generate_millitimestamp_uuid():
89 """generate a millisecond timestamp uuid"""
90 then = datetime.datetime.now()
91 return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3)
96 from pprint import pprint
98 with open('robot/assets/dcae/ves_volte_single_fault_event.json') as data_file:
99 data = json.load(data_file)
101 data['event']['commonEventHeader']['version'] = '5.0'
105 if __name__ == '__main__':
108 lib.enable_vesc_https_auth()
110 ret = lib.setup_dmaap_server()