pass
def on_distribute(self, csar_id):
+ """
+ Fetch NS package csar from SDC
+ :param csar_id:
+ :return:
+ """
if NSPackageModel.objects.filter(nsPackageId=csar_id):
return [1, "NS CSAR(%s) already exists." % csar_id]
DISABLED="DISABLED"
)
+# CREDENTIALS
AUTH_TYPES = [
"BASIC",
"OAUTH2_CLIENT_CREDENTIALS",
OAUTH2_CLIENT_CREDENTIALS = "OAUTH2_CLIENT_CREDENTIALS"
+# subscription & notification
NOTIFICATION_TYPES = [
"VnfPackageOnboardingNotification",
"VnfPackageChangeNotification"
class HealthCheckView(APIView):
+ """
+ APL Health Check
+ """
+
@swagger_auto_schema(
tags=[TAG_HEALTH_CHECK],
responses={
def ns_info_rd(request, **kwargs):
nsd_info_id = kwargs.get("nsdInfoId")
if request.method == 'GET':
+ # Read information about an individual NS descriptor resource.
data = NsDescriptor().query_single(nsd_info_id)
nsd_info = validate_data(data, NsdInfoSerializer)
return Response(data=nsd_info.data, status=status.HTTP_200_OK)
if request.method == 'DELETE':
+ # Delete an individual NS descriptor resource.
NsDescriptor().delete_single(nsd_info_id)
return Response(status=status.HTTP_204_NO_CONTENT)
@view_safe_call_with_log(logger=logger)
def ns_descriptors_rc(request):
if request.method == 'POST':
+ # Create a new NS descriptor resource.
create_nsd_info_request = validate_data(request.data, CreateNsdInfoRequestSerializer)
data = NsDescriptor().create(create_nsd_info_request.data)
validate_data(data, NsdInfoSerializer)
return Response(data=data, status=status.HTTP_201_CREATED)
if request.method == 'GET':
+ # Query information about multiple NS descriptor resources.
nsdId = request.query_params.get("nsdId", None)
data = NsDescriptor().query_multiple(nsdId)
validate_data(data, NsdInfosSerializer)
def nsd_content_ru(request, **kwargs):
nsd_info_id = kwargs.get("nsdInfoId")
if request.method == 'PUT':
+ # Upload the content of a NSD.
files = request.FILES.getlist('file')
try:
local_file_name = NsDescriptor().upload(nsd_info_id, files[0])
raise e
if request.method == 'GET':
+ # Fetch the content of a NSD.
file_range = request.META.get('HTTP_RANGE')
file_iterator = NsDescriptor().download(nsd_info_id, file_range)
return StreamingHttpResponse(file_iterator, status=status.HTTP_200_OK)
@view_safe_call_with_log(logger=logger)
def nsd_subscription_rc(request):
if request.method == 'POST':
+ # Subscribe to NSD and PNFD change notifications.
logger.debug("SubscribeNotification--post::> %s" % request.data)
nsdm_subscription_request = \
validate_data(request.data,
return Response(data=subscription, status=status.HTTP_201_CREATED)
if request.method == 'GET':
+ # Query multiple subscriptions.
logger.debug("Subscription Notification GET %s" % request.query_params)
request_query_params = {}
if request.query_params:
@api_view(http_method_names=['GET', 'DELETE'])
@view_safe_call_with_log(logger=logger)
def nsd_subscription_rd(request, **kwargs):
+ """
+ Individual subscription
+ :param request:
+ :param kwargs:
+ :return:
+ """
subscription_id = kwargs.get("subscriptionId")
validate_data({'subscription_id': subscription_id}, NsdmSubscriptionIdSerializer)
if request.method == 'GET':
+ # Read an individual subscription resource
subscription_data = NsdmSubscription().query_single_subscription(subscription_id)
subscription = validate_data(subscription_data, NsdmSubscriptionSerializer)
return Response(data=subscription.data, status=status.HTTP_200_OK)
elif request.method == 'DELETE':
+ # Terminate a subscription
subscription_data = NsdmSubscription().delete_single_subscription(subscription_id)
return Response(status=status.HTTP_204_NO_CONTENT)
)
@view_safe_call_with_log(logger=logger)
def get(self, request, vnfPkgId, artifactPath):
+ """
+ Fetch artifact from vnf package
+ :param request:
+ :param vnfPkgId:
+ :param artifactPath:
+ :return:
+ """
logger.debug("FetchVnfPkgmArtifactsView--get::> ")
resp_data = FetchVnfPkgArtifact().fetch(vnfPkgId, artifactPath)
class CreateQuerySubscriptionView(APIView):
+ """
+ This resource represents subscriptions.
+ The client can use this resource to subscribe to notifications related to NS lifecycle management,
+ and to query its subscriptions.
+ """
@swagger_auto_schema(
tags=[TAG_VNF_PACKAGE_API],
)
@view_safe_call_with_log(logger=logger)
def post(self, request):
+ """
+ The POST method creates a new subscription
+ :param request:
+ :return:
+ """
logger.debug("Create VNF package Subscription> %s" % request.data)
vnf_pkg_subscription_request = validate_req_data(request.data, PkgmSubscriptionRequestSerializer)
data = CreateSubscription(vnf_pkg_subscription_request.data).do_biz()
)
@view_safe_call_with_log(logger=logger)
def get(self, request):
+ """
+ The GET method queries the list of active subscriptions of the functional block that invokes the method.
+ It can be used e.g. for resynchronization after error situations.
+ :param request:
+ :return:
+ """
logger.debug("SubscribeNotification--get::> %s" % request.query_params)
if request.query_params and not set(request.query_params).issubset(set(VALID_FILTERS)):
class QueryTerminateSubscriptionView(APIView):
+ """
+ This resource represents an individual subscription.
+ It can be used by the client to read and to terminate a subscription to Notifications related to NS lifecycle management.
+ """
@swagger_auto_schema(
tags=[TAG_VNF_PACKAGE_API],
)
@view_safe_call_with_log(logger=logger)
def get(self, request, subscriptionId):
+ """
+ The GET method retrieves information about a subscription by reading an individual subscription resource.
+ :param request:
+ :param subscriptionId:
+ :return:
+ """
logger.debug("SubscribeNotification--get::> %s" % subscriptionId)
resp_data = QuerySubscription().query_single_subscription(subscriptionId)
)
@view_safe_call_with_log(logger=logger)
def delete(self, request, subscriptionId):
+ """
+ The DELETE method terminates an individual subscription.
+ :param request:
+ :param subscriptionId:
+ :return:
+ """
logger.debug("SubscribeNotification--get::> %s" % subscriptionId)
TerminateSubscription().terminate(subscriptionId)
class PkgOnboardingNotificationView(APIView):
+ """
+ This resource represents a notification endpoint about package onboarding
+ """
+
@swagger_auto_schema(
tags=[TAG_VNF_PACKAGE_API],
request_body=PkgOnboardingNotificationSerializer,
class PkgChangeNotificationView(APIView):
+ """
+ This resource represents a notification endpoint about package change
+ """
+
@swagger_auto_schema(
tags=[TAG_VNF_PACKAGE_API],
request_body=PkgChangeNotificationSerializer,
@view_safe_call_with_log(logger=logger)
def vnf_packages_rc(request):
if request.method == 'GET':
+ # Query VNF packages information
logger.debug("Query VNF packages> %s" % request.data)
data = VnfPackage().query_multiple()
validate_data(data, VnfPkgInfosSerializer)
return Response(data=data, status=status.HTTP_200_OK)
if request.method == 'POST':
+ # Create a new individual VNF package resource
logger.debug("Create VNF package> %s" % request.data)
create_vnf_pkg_info_request = validate_req_data(request.data, CreateVnfPkgInfoRequestSerializer)
data = VnfPackage().create_vnf_pkg(create_vnf_pkg_info_request.data)
@api_view(http_method_names=["GET"])
@view_safe_call_with_log(logger=logger)
def vnfd_rd(request, **kwargs):
+ """
+ Get the VNFD by VNF package id
+ :param request:
+ :param kwargs:
+ :return:
+ """
vnf_pkg_id = kwargs.get("vnfPkgId")
logger.debug("Read VNFD for VNF package %s" % vnf_pkg_id)
try:
def package_content_ru(request, **kwargs):
vnf_pkg_id = kwargs.get("vnfPkgId")
if request.method == "PUT":
+ # Upload a VNF package by providing the content of the VNF package
logger.debug("Upload VNF package %s" % vnf_pkg_id)
files = request.FILES.getlist('file')
try:
raise e
if request.method == "GET":
+ # Fetch an on-boarded VNF package
file_range = request.META.get('HTTP_RANGE')
file_iterator = VnfPackage().download(vnf_pkg_id, file_range)
return StreamingHttpResponse(file_iterator, status=status.HTTP_200_OK)
@api_view(http_method_names=['POST'])
@view_safe_call_with_log(logger=logger)
def upload_from_uri_c(request, **kwargs):
+ """
+ Upload a VNF package by providing the address information of the VNF package
+ :param request:
+ :param kwargs:
+ :return:
+ """
vnf_pkg_id = kwargs.get("vnfPkgId")
try:
upload_vnf_from_uri_request = validate_req_data(request.data,
def vnf_package_rd(request, **kwargs):
vnf_pkg_id = kwargs.get("vnfPkgId")
if request.method == 'GET':
+ # Read information about an individual VNF package
logger.debug("Query an individual VNF package> %s" % request.data)
data = VnfPackage().query_single(vnf_pkg_id)
validate_data(data, VnfPkgInfoSerializer)
return Response(data=data, status=status.HTTP_200_OK)
if request.method == 'DELETE':
+ # Delete an individual VNF package
logger.debug("Delete an individual VNF package> %s" % request.data)
VnfPackage().delete_vnf_pkg(vnf_pkg_id)
return Response(data=None, status=status.HTTP_204_NO_CONTENT)
def make_dirs(path):
+ """
+ Make directories
+ :param path:
+ :return:
+ """
if not os.path.exists(path):
os.makedirs(path, 0o777)
def delete_dirs(path):
+ """
+ Delete directories
+ :param path:
+ :return:
+ """
try:
if os.path.exists(path):
shutil.rmtree(path)
def download_file_from_http(url, local_dir, file_name):
+ """
+ Download file from http and save to local dir
+ :param url:
+ :param local_dir:
+ :param file_name:
+ :return:
+ """
local_file_name = os.path.join(local_dir, file_name)
is_download_ok = False
try:
def unzip_file(zip_src, dst_dir, csar_path):
+ """
+ Unzip the zip file to given path
+ :param zip_src:
+ :param dst_dir:
+ :param csar_path:
+ :return:
+ """
logger.debug("unzip_file %s to %s.", zip_src, dst_dir)
if os.path.exists(zip_src):
logger.debug("unzip_file %s.", zip_src)
def unzip_csar(zip_src, dst_dir):
+ """
+ Unzip csar package
+ :param zip_src:
+ :param dst_dir:
+ :return:
+ """
if os.path.exists(zip_src):
fz = zipfile.ZipFile(zip_src, 'r')
for file in fz.namelist():
def unzip_csar_to_tmp(zip_src):
+ """
+ Unzip csar package to temp path
+ :param zip_src:
+ :return:
+ """
dirpath = tempfile.mkdtemp()
zip_ref = zipfile.ZipFile(zip_src, 'r')
zip_ref.extractall(dirpath)
def get_artifact_path(vnf_path, artifact_file):
+ """
+ Get the path of artifact file
+ :param vnf_path:
+ :param artifact_file:
+ :return:
+ """
for root, dirs, files in os.walk(vnf_path):
if artifact_file in files:
return os.path.join(root, artifact_file)
def end_with(_s_in, *suffix):
+ """
+ Check if end with given suffix
+ :param _s_in:
+ :param suffix:
+ :return:
+ """
array = map(_s_in.endswith, suffix)
if True in array:
return True
def filter_files(search_path, suffix):
+ """
+ Filter file by given suffix
+ :param search_path:
+ :param suffix:
+ :return:
+ """
f_find = []
file_list = os.listdir(search_path)
for file_item in file_list:
def recreate_dir(path):
+ """
+ Recreate directory
+ :param path:
+ :return:
+ """
if os.path.exists(path):
shutil.rmtree(path)
os.makedirs(path, mode=0o777)
def copy(src_file, dest_dir, new_file_name=None):
+ """
+ Copy file to given dest dir
+ :param src_file:
+ :param dest_dir:
+ :param new_file_name:
+ :return:
+ """
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
if new_file_name is None: