}
}
}
- grant_resp_data = {
- "id": "1",
- "vnfInstanceId": "1",
- "vnfLcmOpOccId": "2",
- "vimConnections": [
- {
- "id": "1",
- "vimId": "1"
- }
- ]
+ grant = {
+ 'id': 'Identifier of the garnt',
+ 'vnfInstanceId': 'Identifier of the related VNF instance',
+ 'vnfLcmOpOccId': 'Identifier of the related VNF LcmOpOcc',
+ 'vimConnections': [],
+ 'zones': [],
+ 'zoneGroups': [],
+ 'computeReservationId': None,
+ 'networkReservationId': None,
+ 'storageReservationId': None,
+ 'addResources': None,
+ 'tempResources': None,
+ 'removeResource': None,
+ 'updateResource': None,
+ 'vimAssets': None,
+ 'extVirtualLinks': None,
+ 'extManagedVirtualLinks': None,
+ 'additionalParams': {},
+ '_links': None
}
- mock_call_req.return_value = [0, json.JSONEncoder().encode(grant_resp_data), '201']
+
+ mock_call_req.return_value = [0, json.JSONEncoder().encode(grant), '201']
response = self.client.put("/api/gvnfmdriver/v1/resource/grant",
data=json.dumps(data), content_type='application/json')
self.assertEqual(status.HTTP_201_CREATED, response.status_code)
expect_resp_data = {
- "id": "1",
+ 'id': 'Identifier of the garnt',
+ 'vnfInstanceId': 'Identifier of the related VNF instance',
+ 'vnfLcmOpOccId': 'Identifier of the related VNF LcmOpOcc',
+ 'vimConnections': [],
+ 'zones': [],
+ 'zoneGroups': [],
+ 'computeReservationId': None,
+ 'networkReservationId': None,
+ 'storageReservationId': None,
+ 'addResources': None,
+ 'tempResources': None,
+ 'removeResource': None,
+ 'updateResource': None,
+ 'vimAssets': None,
+ 'extVirtualLinks': None,
+ 'extManagedVirtualLinks': None,
+ 'additionalParams': {},
+ '_links': None
+ }
+ self.assertDictEqual(expect_resp_data, response.data)
+
+ @mock.patch.object(restcall, 'call_req')
+ def test_grantvnf_failed(self, mock_call_req):
+ data = {
"vnfInstanceId": "1",
"vnfLcmOpOccId": "2",
- "vimConnections": [
+ "vnfdId": "3",
+ "flavourId": "4",
+ "operation": "INSTANTIATE",
+ "isAutomaticInvocation": True,
+ "instantiationLevelId": "5",
+ "addResources": [
{
"id": "1",
- "vimId": "1"
+ "type": "COMPUTE",
+ "vduId": "2",
+ "resourceTemplateId": "3",
+ "resourceTemplate": {
+ "vimConnectionId": "4",
+ "resourceProviderId": "5",
+ "resourceId": "6",
+ "vimLevelResourceType": "7"
+ }
+ }
+ ],
+ "placementConstraints": [
+ {
+ "affinityOrAntiAffinity": "AFFINITY",
+ "scope": "NFVI_POP",
+ "resource": [
+ {
+ "idType": "RES_MGMT",
+ "resourceId": "1",
+ "vimConnectionId": "2",
+ "resourceProviderId": "3"
+ }
+ ]
+ }
+ ],
+ "vimConstraints": [
+ {
+ "sameResourceGroup": True,
+ "resource": [
+ {
+ "idType": "RES_MGMT",
+ "resourceId": "1",
+ "vimConnectionId": "2",
+ "resourceProviderId": "3"
+ }
+ ]
+ }
+ ],
+ "additionalParams": {},
+ "_links": {
+ "vnfLcmOpOcc": {
+ "href": "1"
+ },
+ "vnfInstance": {
+ "href": "2"
}
- ]
+ }
}
- self.assertDictEqual(expect_resp_data, response.data)
+ mock_call_req.return_value = [1, json.JSONEncoder().encode(""), '201']
+ response = self.client.put("/api/gvnfmdriver/v1/resource/grant",
+ data=json.dumps(data), content_type='application/json')
+ self.assertEqual(status.HTTP_500_INTERNAL_SERVER_ERROR, response.status_code)
@mock.patch.object(restcall, 'call_req')
def test_notify(self, mock_call_req):
r2 = [0, json.JSONEncoder().encode(vim_info), "200"]
mock_call_req.side_effect = [r2]
req_data = {
- "nfvoid": "1",
- "vnfmid": "876543211",
- "vimid": "6543211",
- "timestamp": "1234567890",
- "vnfinstanceid": "1",
- "eventtype": "0",
- "vmlist": [
- {
- "vmflavor": "SMP",
- "vmnumber": "3",
- "vmidlist ": [
- "vmuuid"
- ]
+ "vnfmInstId": "876543211",
+ "notificationType": "string",
+ "subscriptionId": "string",
+ "timeStamp": "1234567890",
+ "notificationStatus": "START",
+ "operationState": "STARTING",
+ "vnfInstanceId": "string",
+ "operation": "INSTANTIATE",
+ "isAutomaticInvocation": True,
+ "vnfLcmOpOccId": "string",
+ "affectedVnfcs": [{
+ "id": "string",
+ "vduId": "string",
+ "changeType": "ADDED",
+ "computeResource": {
+ "vimConnectionId": "string",
+ "resourceProviderId": "string",
+ "resourceId": "string",
+ "vimLevelResourceType": "string"
},
- {
- "vmflavor": "CMP",
- "vmnumber": "3",
- "vmidlist ": [
- "vmuuid"
- ]
+ "metadata": {},
+ "affectedVnfcCpIds": [],
+ "addedStorageResourceIds": [],
+ "removedStorageResourceIds": [],
+ }],
+ "affectedVirtualLinks": [{
+ "id": "string",
+ "virtualLinkDescId": "string",
+ "changeType": "ADDED",
+ "networkResource": {
+ "vimConnectionId": "string",
+ "resourceProviderId": "string",
+ "resourceId": "string",
+ "vimLevelResourceType": "network",
}
- ]
+ }],
+ "affectedVirtualStorages": [{
+ "id": "string",
+ "virtualStorageDescId": "string",
+ "changeType": "ADDED",
+ "storageResource": {
+ "vimConnectionId": "string",
+ "resourceProviderId": "string",
+ "resourceId": "string",
+ "vimLevelResourceType": "network",
+ },
+ "metadata": {}
+ }],
+ "changedInfo": {
+ "vnfInstanceName": "string",
+ "vnfInstanceDescription": "string",
+ "vnfConfigurableProperties": {},
+ "metadata": {},
+ "extensions": {},
+ "vimConnectionInfo": [{
+ "id": "string",
+ "vimId": "string",
+ "vimType": "string",
+ "interfaceInfo": {},
+ "accessInfo": {},
+ "extra": {}
+ }],
+ "vnfPkgId": "string",
+ "vnfdId": "string",
+ "vnfProvider": "string",
+ "vnfProductName": "string",
+ "vnfSoftwareVersion": "string",
+ "vnfdVersion": "string"
+ },
+ "changedExtConnectivity": [{
+ "id": "string",
+ "resourceHandle": {
+ "vimConnectionId": "string",
+ "resourceProviderId": "string",
+ "resourceId": "string",
+ "vimLevelResourceType": "string"
+ },
+ "extLinkPorts": [{
+ "id": "string",
+ "resourceHandle": {
+ "vimConnectionId": "string",
+ "resourceProviderId": "string",
+ "resourceId": "string",
+ "vimLevelResourceType": "string"
+ },
+ "cpInstanceId": "string"
+ }]
+ }]
}
response = self.client.post("/api/gvnfmdriver/v1/vnfs/lifecyclechangesnotification",
data=json.dumps(req_data),
self.assertEqual(1, len(resp.data["csars"]))
self.assertEqual("1", resp.data["csars"][0]["csarId"])
self.assertEqual("2", resp.data["csars"][0]["vnfdId"])
+
+ @mock.patch.object(restcall, 'call_req')
+ def test_get_vnfpkgs_failed(self, mock_call_req):
+ mock_call_req.return_value = [1, json.JSONEncoder().encode(""), '200']
+ resp = self.client.get("/api/gvnfmdriver/v1/vnfpackages")
+ self.assertEqual(status.HTTP_500_INTERNAL_SERVER_ERROR, resp.status_code)
+
+ @mock.patch.object(restcall, 'call_req')
+ def test_get_vnflcmopocc_with_id(self, mock_call_req):
+ vnfLcmOpOccId = "99442b18-a5c7-11e8-998c-bf1755941f16"
+ vnfm_info = {
+ "vnfmId": "19ecbb3a-3242-4fa3-9926-8dfb7ddc29ee",
+ "name": "g_vnfm",
+ "type": "gvnfmdriver",
+ "vimId": "",
+ "vendor": "ZTE",
+ "version": "v1.0",
+ "description": "vnfm",
+ "certificateUrl": "",
+ "url": "http://10.74.44.11",
+ "userName": "admin",
+ "password": "admin",
+ "createTime": "2016-07-06 15:33:18"
+ }
+ dummy_single_vnf_lcm_op = {
+ "id": vnfLcmOpOccId,
+ "operationState": "STARTING",
+ "stateEnteredTime": "2018-07-09",
+ "startTime": "2018-07-09",
+ "vnfInstanceId": "cd552c9c-ab6f-11e8-b354-236c32aa91a1",
+ "grantId": None,
+ "operation": "SCALE",
+ "isAutomaticInvocation": False,
+ "operationParams": {},
+ "isCancelPending": False,
+ "cancelMode": None,
+ "error": None,
+ "resourceChanges": None,
+ "changedInfo": None,
+ "changedExtConnectivity": None,
+ "_links": {
+ "self": {
+ "href": "dem1o"
+ },
+ "vnfInstance": "demo"
+ }
+ }
+ mock_call_req.return_value = [0, json.JSONEncoder().encode(dummy_single_vnf_lcm_op), status.HTTP_200_OK]
+ resp = self.client.get("/api/gvnfmdriver/v1/%s/vnf_lcm_op_occs/%s" % (vnfm_info['vnfmId'], vnfLcmOpOccId))
+ self.assertEqual(dummy_single_vnf_lcm_op, resp.data)
+ self.assertEqual(status.HTTP_200_OK, resp.status_code)
+
+ @mock.patch.object(restcall, 'call_req')
+ def test_get_vnflcmopocc_failed(self, mock_call_req):
+ vnfLcmOpOccId = "99442b18-a5c7-11e8-998c-bf1755941f16"
+ vnfm_info = {
+ "vnfmId": "19ecbb3a-3242-4fa3-9926-8dfb7ddc29ee",
+ "name": "g_vnfm",
+ "type": "gvnfmdriver",
+ "vimId": "",
+ "vendor": "ZTE",
+ "version": "v1.0",
+ "description": "vnfm",
+ "certificateUrl": "",
+ "url": "http://10.74.44.11",
+ "userName": "admin",
+ "password": "admin",
+ "createTime": "2016-07-06 15:33:18"
+ }
+ mock_call_req.return_value = [1, json.JSONEncoder().encode({}), status.HTTP_500_INTERNAL_SERVER_ERROR]
+ resp = self.client.get("/api/gvnfmdriver/v1/%s/vnf_lcm_op_occs/%s" % (vnfm_info['vnfmId'], vnfLcmOpOccId))
+ self.assertEqual(status.HTTP_500_INTERNAL_SERVER_ERROR, resp.status_code)