+ send_notification(self.csar_id, const.PKG_NOTIFICATION_TYPE.ONBOARDING,
+ const.PKG_CHANGE_TYPE.OP_STATE_CHANGE)
+
+ def create_vnfd_zip(self, csar_id, vendor_vnf_file):
+ """
+ Create VNFD zip file.
+ :param csar_id: CSAR Id
+ :param vendor_vnf_file: the vendor original package(csar)
+ :return:
+ """
+ if os.path.exists(vendor_vnf_file):
+ # create VNFD from vendor original package
+ VnfPackage().creat_vnfd(csar_id, vendor_vnf_file)
+ else:
+ try:
+ vnf_package_path = os.path.join(CATALOG_ROOT_PATH, self.csar_id)
+ vnfd_zip_file = os.path.join(vnf_package_path, 'VNFD.zip')
+ with zipfile.ZipFile(vnfd_zip_file, 'w', zipfile.ZIP_DEFLATED) as vnfd_zip:
+ def_path = os.path.join(vnf_package_path, "Definitions")
+ if os.path.exists(def_path):
+ def_files = os.listdir(def_path)
+ for def_file in def_files:
+ full_path = os.path.join(def_path, def_file)
+ vnfd_zip.write(full_path, def_file)
+ except Exception as e:
+ logger.error(e)
+ if os.path.exists(vnfd_zip_file):
+ os.remove(vnfd_zip_file)