from enum import Enum
import argparse
import sys
+import colorama
from colorama import Fore
import urllib.parse
from os import path
def __str__(self):
return self.name
-#Read the ingress controller http and https ports from the kubernetes config
+#Read the ingress controller http and https ports from the kubernetes cluster
def find_ingress_ports(v1):
svc = v1.list_namespaced_service(K8S_INGRESS_NS)
http_port = 0
http_port = pinfo.node_port
elif pinfo and pinfo.name == 'https':
https_port = pinfo.node_port
- return http_port,https_port
+
+ return http_port,https_port
+ else: return(80,443)
# List all ingress devices
def list_ingress(xv1b):
for mode in ScanMode:
retstatus, out = scan_single_port(host,port,mode)
if not retstatus:
- result = port, mode, out.getcode(), out.getheaders(),mode
+ result = port, mode, out.getcode(), out.read().decode('utf-8'),mode
break
else:
result = port, retstatus, out, None,mode
if mode==ScanMode.HTTP:
retstatus, out = scan_single_port(host,http,ScanMode.HTTP)
if not retstatus:
- return host, ScanMode.HTTP, out.getcode(), out.getheaders(), mode
+ return host, ScanMode.HTTP, out.getcode(), out.read().decode('utf-8'), mode
else:
return host, retstatus, out, None, mode
elif mode==ScanMode.HTTPS:
retstatus, out = scan_single_port(host,https,ScanMode.HTTPS)
if not retstatus:
- return host, ScanMode.HTTPS, out.getcode(), out.getheaders(), mode
+ return host, ScanMode.HTTPS, out.getcode(), out.read().decode('utf-8'), mode
else:
retstatus, out = scan_single_port(host,https,ScanMode.HTTPU)
if not retstatus:
- return host, ScanMode.HTTPU, out.getcode(), out.getheaders(), mode
+ return host, ScanMode.HTTPU, out.getcode(), out.read().decode('utf-8'), mode
else:
return host, retstatus, out, None, mode
if not out: out = str(retstatus)
print( Fore.RED, '[ERROR ' +str(mode) +']', Fore.RESET,'\t', str(out))
+# Visualize compare results
+def console_compare_visualisation(cname,d1,d2):
+ print(Fore.YELLOW,end='')
+ print(cname, end='\t',sep='\t')
+ if d1!=d2:
+ print(Fore.RED + '[ERROR] '+ Fore.RESET)
+ if d1[0]!=d2[0]:
+ print('\tCode:',d1[0],'!=',d2[0])
+ if d1[1]!=d2[1]:
+ print('\t******** Response #1 ********\n',d1[1])
+ print('\t******** Response #2 ********\n',d2[1])
+ else:
+ print(Fore.GREEN + '[OK ',d1[0],']', Fore.RESET,sep='')
+ if VERBOSE and d1[1]:
+ print(d1[1])
+
+
# Port detector type
def check_onap_ports():
print("Scanning onap NodePorts")
for name,hosts in check_list.items():
print(Fore.GREEN, name + Fore.RESET,":", *hosts[0], Fore.RED+':', hosts[1],Fore.RESET)
+#Scan and compare nodeports and ingress check for results
+def compare_nodeports_and_ingress():
+ ihttp,ihttps = find_ingress_ports(v1)
+ print('Scanning nodeport services ...')
+ check_list = list_nodeports(v1)
+ if not check_list:
+ print(Fore.RED + 'Unable to find any declared node port in the K8S cluster', Fore.RESET)
+ valid_results = {}
+ for k,v in check_list.items():
+ for port in v:
+ nodeport_results = scan_portn(port)
+ if isinstance(nodeport_results[1],ScanMode) and nodeport_results[2] != 404:
+ valid_results[k] = nodeport_results
+ if VERBOSE: console_visualisation(k,*nodeport_results)
+ check_list = list_ingress(v1b)
+ if not check_list:
+ print(Fore.RED+ 'Unable to find any declared ingress service in the K8S cluster', Fore.RESET)
+ print('Scanning ingress services ...')
+ ing_valid_results = {}
+ for k,v in check_list.items():
+ for host in v[0]:
+ ingress_results = scan_port(host,ihttp,ihttps,v[1])
+ if isinstance(ingress_results[1],ScanMode) and ingress_results[2]!=404:
+ ing_valid_results[k] = ingress_results
+ if VERBOSE: console_visualisation(k,*ingress_results,httpcodes=[404])
+ ks1 = set(valid_results.keys())
+ ks2 = set(ing_valid_results.keys())
+ diff_keys = (ks1 - ks2) | (ks2 - ks1)
+ common_keys = ks1 & ks2
+ if VERBOSE and diff_keys:
+ print(Fore.BLUE + '[WARNING] Non matching nodes and ingress list:')
+ for key in diff_keys: print(key,sep='\t')
+ print(Fore.RESET + 'Please check is it correct.')
+ print('Matching ingress and nodeport host scan results:')
+ for scan_key in common_keys:
+ s1 = valid_results[scan_key][2:4]
+ s2 = ing_valid_results[scan_key][2:4]
+ num_failures = 0
+ if s1!=s2: ++num_failures
+ console_compare_visualisation(scan_key,s1,s2)
+ return num_failures
+
+def kube_config_exists(conf):
+ try:
+ assert path.exists(conf)
+ except AssertionError:
+ raise argparse.ArgumentTypeError(f'Fatal! K8S config {conf} does not exist')
+ else:
+ return conf
+
if __name__ == "__main__":
+ colorama.init()
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
- parser.add_argument("--scan-nodeport",
+ command_group = parser.add_mutually_exclusive_group()
+ command_group.add_argument("--scan-nodeport",
default=False, action='store_true',
help='Scan onap for node services'
)
- parser.add_argument("--scan-ingress",
+ command_group.add_argument("--scan-ingress",
default=False, action='store_true',
help='Scan onap for ingress services'
)
+ command_group.add_argument("--scan-and-compare",
+ default=False, action='store_true',
+ help='Scan nodeports and ingress and compare results'
+ )
parser.add_argument( "--namespace",
default='onap', action='store',
help = 'kubernetes onap namespace'
)
parser.add_argument( "--conf",
default='~/.kube/config', action='store',
- help = 'kubernetes config file'
+ help = 'kubernetes config file',
+ type = kube_config_exists
)
parser.add_argument("--verbose",
default=False, action='store_true',
K8S_NAMESPACE = args.namespace
K8S_INGRESS_NS = args.ingress_namespace
VERBOSE = args.verbose
- try:
- assert path.exists(args.conf)
- except AssertionError:
- print('Fatal! K8S config',args.conf, 'does not exist',file=sys.stderr)
- sys.exit(-1)
config.load_kube_config(config_file=args.conf)
v1 = client.CoreV1Api()
v1b = client.ExtensionsV1beta1Api()
v1c = client.Configuration()
if args.scan_nodeport: check_onap_ports()
elif args.scan_ingress: check_onap_ingress()
+ elif args.scan_and_compare: sys.exit(compare_nodeports_and_ingress())
else: onap_list_all()