mock_download_artifacts.return_value = "/home/vcpe.csar"
mock_call_req.return_value = [0, json.JSONEncoder().encode([{
"uuid": "1",
- "toscaModelURL": "https://127.0.0.1:1234/sdc/v1/vcpe.csar"
+ "toscaModelURL": "https://127.0.0.1:1234/sdc/v1/vcpe.csar",
+ "distributionStatus": "DISTRIBUTED"
}]), '200']
NSPackageModel(nsPackageId="2", nsdId="VCPE_NS").save()
resp = self.client.post(
mock_download_artifacts.return_value = "/home/vcpe.csar"
mock_call_req.return_value = [0, json.JSONEncoder().encode([{
"uuid": "1",
- "toscaModelURL": "https://127.0.0.1:1234/sdc/v1/vcpe.csar"
+ "toscaModelURL": "https://127.0.0.1:1234/sdc/v1/vcpe.csar",
+ "distributionStatus": "DISTRIBUTED",
}]), '200']
resp = self.client.post(
"/api/catalog/v1/nspackages", {"csarId": "1"}, format='json')
mock_download_artifacts.return_value = "/home/vcpe.csar"
mock_call_req.return_value = [0, json.JSONEncoder().encode([{
"uuid": "1",
- "toscaModelURL": "https://127.0.0.1:1234/sdc/v1/vcpe.csar"
+ "toscaModelURL": "https://127.0.0.1:1234/sdc/v1/vcpe.csar",
+ "distributionStatus": "DISTRIBUTED"
}]), '200']
VnfPackageModel(vnfPackageId="1", vnfdId="vcpe_vfw_zte_1_0").save()
resp = self.client.post(
"CSAR(1) distributed successfully.",
resp.data["statusDescription"])
+ @mock.patch.object(sdc, 'get_artifacts')
+ def test_ns_when_not_distributed_by_sdc(self, mock_get_artifacts):
+ mock_get_artifacts.return_value = [{
+ "uuid": "1",
+ "invariantUUID": "63eaec39-ffbe-411c-a838-448f2c73f7eb",
+ "name": "underlayvpn",
+ "version": "2.0",
+ "toscaModelURL": "/sdc/v1/catalog/resources/c94490a0-f7ef-48be-b3f8-8d8662a37236/toscaModel",
+ "category": "Volte",
+ "subCategory": "VolteVNF",
+ "resourceType": "VF",
+ "lifecycleState": "CERTIFIED",
+ "distributionStatus": "DISTRIBUTION_APPROVED",
+ "lastUpdaterUserId": "jh0003"
+ }]
+ resp = self.client.post(
+ "/api/catalog/v1/nspackages", {"csarId": "1"}, format='json')
+ self.assertEqual(resp.status_code, status.HTTP_202_ACCEPTED)
+ self.assertEqual("failed", resp.data["status"])
+ self.assertEqual(
+ "The artifact (services,1) is not distributed from sdc.",
+ resp.data["statusDescription"])
+
##########################################################################
def test_ns_pkg_normal_delete(self):
ASSETTYPE_RESOURCES = "resources"
ASSETTYPE_SERVICES = "services"
+DISTRIBUTED = "DISTRIBUTED"
def call_sdc(resource, method, content=''):
artifacts = get_artifacts(asset_type)
for artifact in artifacts:
if artifact["uuid"] == csar_id:
- return artifact
+ if asset_type == ASSETTYPE_SERVICES and \
+ artifact.get("distributionStatus", None) != DISTRIBUTED:
+ raise CatalogException("The artifact (%s,%s) is not distributed from sdc." % (asset_type, csar_id))
+ else:
+ return artifact
raise CatalogException("Failed to query artifact(%s,%s) from sdc." % (asset_type, csar_id))