if NSDModel.objects.filter(id=csar_id):
raise NSLCMException("NS CSAR(%s) already exists." % csar_id)
- artifact = sdc.get_artifact(sdc.ASSETTYPE_SERVICES, csar_id)
- local_path = os.path.join(CATALOG_ROOT_PATH, csar_id)
- local_file_name = sdc.download_artifacts(artifact["toscaModelURL"], local_path)
-
- nsd_json = toscaparser.parse_nsd(local_file_name)
- nsd = json.JSONDecoder().decode(nsd_json)
+ nsd,local_file_name,nsd_json = self.get_nsd(csar_id)
nsd_id = nsd["metadata"]["id"]
if NSDModel.objects.filter(nsd_id=nsd_id):
return [0, "CSAR(%s) distributed successfully." % csar_id]
+ def get_nsd(self, csar_id):
+ artifact = sdc.get_artifact(sdc.ASSETTYPE_SERVICES, csar_id)
+ local_path = os.path.join(CATALOG_ROOT_PATH, csar_id)
+ local_file_name = sdc.download_artifacts(artifact["toscaModelURL"], local_path)
+
+ nsd_json = toscaparser.parse_nsd(local_file_name)
+ nsd = json.JSONDecoder().decode(nsd_json)
+
+ return nsd,local_file_name,nsd_json
def delete_csar(self, csar_id, force_delete):
'''
@mock.patch.object(NsPackage,'get_nsd')
def test_ns_distribute(self, mock_get_nsd):
-
-
local_file_name = "/url/local/filename"
nsd = json.JSONEncoder().encode(self.nsd_json)
mock_get_nsd.return_value = self.nsd_json,local_file_name,nsd