Merge "Add CONFIG_BINDING_SERVICE_SERVICE_PORT to env_local.sh"
[integration/csit.git] / tests / dcaegen2 / testcases / resources / DcaeLibrary.py
1 '''
2 Created on Aug 18, 2017
3
4 @author: sw6830
5 '''
6 from robot.api import logger
7 from Queue import Queue
8 import uuid
9 import time
10 import datetime
11 import json
12 import threading
13 import os
14 import platform
15 import subprocess
16 import paramiko
17 import DcaeVariables
18 import DMaaP
19
20
21 class DcaeLibrary(object):
22     
23     def __init__(self):
24         pass 
25     
26     @staticmethod
27     def setup_dmaap_server(port_num=3904):
28         if DcaeVariables.HttpServerThread is not None:
29             DMaaP.clean_up_event()
30             logger.console("Clean up event from event queue before test")
31             logger.info("DMaaP Server already started")
32             return "true"
33         
34         DcaeVariables.IsRobotRun = True
35         DMaaP.test(port=port_num)
36         try:
37             DcaeVariables.VESEventQ = Queue()
38             DcaeVariables.HttpServerThread = threading.Thread(name='DMAAP_HTTPServer', target=DMaaP.DMaaPHttpd.serve_forever)
39             DcaeVariables.HttpServerThread.start()
40             logger.console("DMaaP Mockup Sever started")
41             time.sleep(2)
42             return "true"
43         except Exception as e:
44             print (str(e))
45             return "false"
46             
47     @staticmethod
48     def shutdown_dmaap():
49         if DcaeVariables.HTTPD is not None:
50             DcaeVariables.HTTPD.shutdown()
51             logger.console("DMaaP Server shut down")
52             time.sleep(3)
53             return "true"
54         else:
55             return "false"
56             
57     @staticmethod
58     def cleanup_ves_events():
59         if DcaeVariables.HttpServerThread is not None:
60             DMaaP.clean_up_event()
61             logger.console("DMaaP event queue is cleaned up")
62             return "true"
63         logger.console("DMaaP server not started yet")
64         return "false"
65     
66     @staticmethod
67     def enable_vesc_with_certBasicAuth():
68         global client
69         if 'Windows' in platform.system():
70             try:
71                 client = paramiko.SSHClient()
72                 client.load_system_host_keys()
73                 # client.set_missing_host_key_policy(paramiko.WarningPolicy)
74                 client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
75                 
76                 client.connect(os.environ['CSIT_IP'], port=22, username=os.environ['CSIT_USER'], password=os.environ['CSIT_PD'])
77                 stdin, stdout, stderr = client.exec_command('%{WORKSPACE}/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh')
78                 logger.console(stdout.read())    
79             finally:
80                 client.close()
81             return
82         ws = os.environ['WORKSPACE']
83         script2run = ws + "/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh"
84         logger.info("Running script: " + script2run)
85         logger.console("Running script: " + script2run)
86         subprocess.call(script2run)
87         time.sleep(5)
88         return
89
90     @staticmethod
91     def dmaap_message_receive(evtobj, action='contain'):
92         
93         evt_str = DMaaP.deque_event()
94         while evt_str != None:
95             if action == 'contain':
96                 if evtobj in evt_str:
97                     logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str)
98                     return 'true'
99             if action == 'sizematch':
100                 if len(evtobj) == len(evt_str):
101                     return 'true'
102             if action == 'dictmatch':
103                 evt_dict = json.loads(evt_str)
104                 if cmp(evtobj, evt_dict) == 0:
105                     return 'true'
106             evt_str = DMaaP.deque_event()
107         return 'false'
108
109     @staticmethod
110     def is_json_empty(resp):
111         logger.info("Enter is_json_empty: resp.text: " + resp.text)
112         if resp.text is None or len(resp.text) < 2:
113             return 'True'
114         return 'False'
115     
116     @staticmethod
117     def generate_uuid():
118         """generate a uuid"""
119         return uuid.uuid4()
120     
121     @staticmethod
122     def get_json_value_list(jsonstr, keyval):
123         logger.info("Enter Get_Json_Key_Value_List")
124         if jsonstr is None or len(jsonstr) < 2:
125             logger.info("No Json data found")
126             return []
127         try:
128             data = json.loads(jsonstr)   
129             nodelist = []
130             for item in data:
131                 nodelist.append(item[keyval])
132             return nodelist
133         except Exception as e:
134             logger.info("Json data parsing fails")
135             print str(e)
136             return []
137         
138     @staticmethod
139     def generate_millitimestamp_uuid():
140         """generate a millisecond timestamp uuid"""
141         then = datetime.datetime.now()
142         return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3)
143     
144     @staticmethod
145     def test():
146         import json
147         from pprint import pprint
148
149         with open('robot/assets/dcae/ves_volte_single_fault_event.json') as data_file:    
150             data = json.load(data_file)
151
152         data['event']['commonEventHeader']['version'] = '5.0'
153         pprint(data)
154
155
156 if __name__ == '__main__':
157     '''
158     dictStr = "action=getTable,Accept=application/json,Content-Type=application/json,X-FromAppId=1234908903284"
159     cls = DcaeLibrary()
160     #dict = cls.create_header_from_string(dictStr)
161     #print str(dict)
162     jsonStr = "[{'Node': 'onapfcnsl00', 'CheckID': 'serfHealth', 'Name': 'Serf Health Status', 'ServiceName': '', 'Notes': '', 'ModifyIndex': 6, 'Status': 'passing', 'ServiceID': '', 'ServiceTags': [], 'Output': 'Agent alive and reachable', 'CreateIndex': 6}]"
163     lsObj = cls.get_json_value_list(jsonStr, 'Status')
164     print lsObj
165     '''
166     
167     lib = DcaeLibrary()
168     lib.enable_vesc_https_auth()
169     
170     ret = lib.setup_dmaap_server()
171     print ret
172     time.sleep(100000)