'vnfmId': 'b0797c9b-3da9-459c-b25c-3813e9d8fd70',
'password': 'admin',
'type': 'gvnfmdriver',
- 'createTime': '2016-10-3111: 08: 39',
+ 'createTime': '2016-10-3T11:08:39',
'description': ''
}
create_vnf_resp = {
}
}
}
- grant_resp_data = {
- "id": "1",
- "vnfInstanceId": "1",
- "vnfLcmOpOccId": "2",
- "vimConnections": [
- {
- "id": "1",
- "vimId": "1"
- }
- ]
+ grant = {
+ 'id': 'Identifier of the garnt',
+ 'vnfInstanceId': 'Identifier of the related VNF instance',
+ 'vnfLcmOpOccId': 'Identifier of the related VNF LcmOpOcc',
+ # NOT REQUIERD #
+ # 'vimConnections': [],
+ # 'zones': [],
+ # 'zoneGroups': [],
+ # 'computeReservationId': None,
+ # 'networkReservationId': None,
+ # 'storageReservationId': None,
+ # 'addResources': None,
+ # 'tempResources': None,
+ # 'removeResource': None,
+ # 'updateResource': None,
+ # 'vimAssets': None,
+ # 'extVirtualLinks': None,
+ # 'extManagedVirtualLinks': None,
+ # 'additionalParams': None,
+ '_links': {
+ 'self': {'href': 'URI of this resource'},
+ 'vnfLcmOpOcc': {'href': 'Related VNF lifecycle management operation occurrence'},
+ 'vnfInstance': {'href': 'Related VNF instance'}
+ }
}
- mock_call_req.return_value = [0, json.JSONEncoder().encode(grant_resp_data), '201']
+
+ mock_call_req.return_value = [0, json.JSONEncoder().encode(grant), '201']
response = self.client.put("/api/gvnfmdriver/v1/resource/grant",
data=json.dumps(data), content_type='application/json')
self.assertEqual(status.HTTP_201_CREATED, response.status_code)
expect_resp_data = {
- "id": "1",
+ 'id': 'Identifier of the garnt',
+ 'vnfInstanceId': 'Identifier of the related VNF instance',
+ 'vnfLcmOpOccId': 'Identifier of the related VNF LcmOpOcc',
+ # NOT REQUIERD #
+ # 'vimConnections': [],
+ # 'zones': [],
+ # 'zoneGroups': [],
+ # 'computeReservationId': None,
+ # 'networkReservationId': None,
+ # 'storageReservationId': None,
+ # 'addResources': None,
+ # 'tempResources': None,
+ # 'removeResource': None,
+ # 'updateResource': None,
+ # 'vimAssets': None,
+ # 'extVirtualLinks': None,
+ # 'extManagedVirtualLinks': None,
+ # 'additionalParams': None,
+ '_links': {
+ 'self': {'href': 'URI of this resource'},
+ 'vnfLcmOpOcc': {'href': 'Related VNF lifecycle management operation occurrence'},
+ 'vnfInstance': {'href': 'Related VNF instance'}
+ }
+ }
+ self.assertDictEqual(expect_resp_data, response.data)
+
+ @mock.patch.object(restcall, 'call_req')
+ def test_grantvnf_failed(self, mock_call_req):
+ data = {
"vnfInstanceId": "1",
"vnfLcmOpOccId": "2",
- "vimConnections": [
+ "vnfdId": "3",
+ "flavourId": "4",
+ "operation": "INSTANTIATE",
+ "isAutomaticInvocation": True,
+ "instantiationLevelId": "5",
+ "addResources": [
{
"id": "1",
- "vimId": "1"
+ "type": "COMPUTE",
+ "vduId": "2",
+ "resourceTemplateId": "3",
+ "resourceTemplate": {
+ "vimConnectionId": "4",
+ "resourceProviderId": "5",
+ "resourceId": "6",
+ "vimLevelResourceType": "7"
+ }
+ }
+ ],
+ "placementConstraints": [
+ {
+ "affinityOrAntiAffinity": "AFFINITY",
+ "scope": "NFVI_POP",
+ "resource": [
+ {
+ "idType": "RES_MGMT",
+ "resourceId": "1",
+ "vimConnectionId": "2",
+ "resourceProviderId": "3"
+ }
+ ]
+ }
+ ],
+ "vimConstraints": [
+ {
+ "sameResourceGroup": True,
+ "resource": [
+ {
+ "idType": "RES_MGMT",
+ "resourceId": "1",
+ "vimConnectionId": "2",
+ "resourceProviderId": "3"
+ }
+ ]
+ }
+ ],
+ "additionalParams": {},
+ "_links": {
+ "vnfLcmOpOcc": {
+ "href": "1"
+ },
+ "vnfInstance": {
+ "href": "2"
}
- ]
+ }
}
- self.assertDictEqual(expect_resp_data, response.data)
+ mock_call_req.return_value = [1, json.JSONEncoder().encode(""), '201']
+ response = self.client.put("/api/gvnfmdriver/v1/resource/grant",
+ data=json.dumps(data), content_type='application/json')
+ self.assertEqual(status.HTTP_500_INTERNAL_SERVER_ERROR, response.status_code)
@mock.patch.object(restcall, 'call_req')
def test_notify(self, mock_call_req):
self.assertEqual(1, len(resp.data["csars"]))
self.assertEqual("1", resp.data["csars"][0]["csarId"])
self.assertEqual("2", resp.data["csars"][0]["vnfdId"])
+
+ @mock.patch.object(restcall, 'call_req')
+ def test_get_vnfpkgs_failed(self, mock_call_req):
+ mock_call_req.return_value = [1, json.JSONEncoder().encode(""), '200']
+ resp = self.client.get("/api/gvnfmdriver/v1/vnfpackages")
+ self.assertEqual(status.HTTP_500_INTERNAL_SERVER_ERROR, resp.status_code)
+
+ @mock.patch.object(restcall, 'call_req')
+ def test_get_vnflcmopocc_with_id(self, mock_call_req):
+ vnfLcmOpOccId = "99442b18-a5c7-11e8-998c-bf1755941f16"
+ vnfm_info = {
+ "vnfmId": "19ecbb3a-3242-4fa3-9926-8dfb7ddc29ee",
+ "name": "g_vnfm",
+ "type": "gvnfmdriver",
+ "vimId": "",
+ "vendor": "ZTE",
+ "version": "v1.0",
+ "description": "vnfm",
+ "certificateUrl": "",
+ "url": "http://10.74.44.11",
+ "userName": "admin",
+ "password": "admin",
+ "createTime": "2016-07-06T15:33:18"
+ }
+ dummy_single_vnf_lcm_op = {
+ "id": vnfLcmOpOccId,
+ "operationState": "STARTING",
+ "stateEnteredTime": "2018-07-09",
+ "startTime": "2018-07-09",
+ "vnfInstanceId": "cd552c9c-ab6f-11e8-b354-236c32aa91a1",
+ "grantId": None,
+ "operation": "SCALE",
+ "isAutomaticInvocation": False,
+ "operationParams": {},
+ "isCancelPending": False,
+ "cancelMode": None,
+ "error": None,
+ "resourceChanges": None,
+ "changedInfo": None,
+ "changedExtConnectivity": None,
+ "_links": {
+ "self": {
+ "href": "dem1o"
+ },
+ "vnfInstance": "demo"
+ }
+ }
+ mock_call_req.return_value = [0, json.JSONEncoder().encode(dummy_single_vnf_lcm_op), status.HTTP_200_OK]
+ resp = self.client.get("/api/gvnfmdriver/v1/%s/vnf_lcm_op_occs/%s" % (vnfm_info['vnfmId'], vnfLcmOpOccId))
+ self.assertEqual(dummy_single_vnf_lcm_op, resp.data)
+ self.assertEqual(status.HTTP_200_OK, resp.status_code)
+
+ @mock.patch.object(restcall, 'call_req')
+ def test_get_vnflcmopocc_failed(self, mock_call_req):
+ vnfLcmOpOccId = "99442b18-a5c7-11e8-998c-bf1755941f16"
+ vnfm_info = {
+ "vnfmId": "19ecbb3a-3242-4fa3-9926-8dfb7ddc29ee",
+ "name": "g_vnfm",
+ "type": "gvnfmdriver",
+ "vimId": "",
+ "vendor": "ZTE",
+ "version": "v1.0",
+ "description": "vnfm",
+ "certificateUrl": "",
+ "url": "http://10.74.44.11",
+ "userName": "admin",
+ "password": "admin",
+ "createTime": "2016-07-06 15:33:18"
+ }
+ mock_call_req.return_value = [1, json.JSONEncoder().encode({}), status.HTTP_500_INTERNAL_SERVER_ERROR]
+ resp = self.client.get("/api/gvnfmdriver/v1/%s/vnf_lcm_op_occs/%s" % (vnfm_info['vnfmId'], vnfLcmOpOccId))
+ self.assertEqual(status.HTTP_500_INTERNAL_SERVER_ERROR, resp.status_code)
+
+ @mock.patch.object(restcall, 'call_req')
+ def test_subscribe_successfully(self, mock_call_req):
+ lccn_subscription_request_data = {
+ "filter": {
+ "notificationTypes": ["VnfLcmOperationOccurrenceNotification"],
+ "operationTypes": ["INSTANTIATE"],
+ "operationStates": ["STARTING"],
+ },
+ "callbackUri": "http://aurl.com",
+ "authentication": {
+ "authType": ["BASIC"],
+ "paramsBasic": {
+ "username": "username",
+ "password": "password"
+ }
+ }
+ }
+ lccn_subscription_data = {
+ "id": "cd552c9c-ab6f-11e8-b354-236c32aa91a1",
+ "callbackUri": "http://aurl.com",
+ "filter": {
+ "notificationTypes": ["VnfLcmOperationOccurrenceNotification"],
+ "operationTypes": ["INSTANTIATE"],
+ "operationStates": ["STARTING"]
+ },
+ "_links": {
+ "self": {"href": "URI of this resource."}
+ },
+ }
+ mock_call_req.return_value = [0, json.JSONEncoder().encode(lccn_subscription_data), status.HTTP_201_CREATED]
+ response = self.client.post("/api/gvnfmdriver/v1/subscriptions", json.dumps(lccn_subscription_request_data),
+ content_type='application/json')
+ self.assertEqual(status.HTTP_201_CREATED, response.status_code)
+ self.assertEqual(lccn_subscription_data, response.data)
+
+ @mock.patch.object(restcall, 'call_req')
+ def test_subscribe_failed(self, mock_call_req):
+ lccn_subscription_request_data = {
+ "filter": {
+ "notificationTypes": ["VnfLcmOperationOccurrenceNotification"],
+ "operationTypes": ["INSTANTIATE"],
+ "operationStates": ["STARTING"],
+ },
+ "callbackUri": "http://aurl.com",
+ "authentication": {
+ "authType": ["BASIC"],
+ "paramsBasic": {
+ "username": "username",
+ "password": "password"
+ }
+ }
+ }
+ mock_call_req.return_value = [1, None, status.HTTP_303_SEE_OTHER]
+ response = self.client.post("/api/gvnfmdriver/v1/subscriptions", json.dumps(lccn_subscription_request_data),
+ content_type='application/json')
+ self.assertEqual(status.HTTP_500_INTERNAL_SERVER_ERROR, response.status_code)