- # @mock.patch.object(restcall, "call_req")
- # def test_terminate_vnf(self, mock_call_req):
- # job_id = JobUtil.create_job("VNF", JOB_TYPE.TERMINATE_VNF, self.nf_inst_id)
- #
- # nfinst = NfInstModel.objects.filter(nfinstid=self.nf_inst_id)
- # if nfinst:
- # self.failUnlessEqual(1, 1)
- # else:
- # self.failUnlessEqual(1, 0)
- #
- # mock_vals = {
- # "/api/ztevmanagerdriver/v1/1/vnfs/111/terminate":
- # [0, json.JSONEncoder().encode({"jobId": job_id}), '200'],
- # "/external-system/esr-vnfm-list/esr-vnfm/1":
- # [0, json.JSONEncoder().encode(vnfm_info), '200'],
- # "/api/resmgr/v1/vnf/1":
- # [0, json.JSONEncoder().encode({"jobId": job_id}), '200'],
- # "/api/ztevmanagerdriver/v1/1/jobs/" + job_id + "?responseId=0":
- # [0, json.JSONEncoder().encode({"jobId": job_id,
- # "responsedescriptor": {"progress": "100",
- # "status": JOB_MODEL_STATUS.FINISHED,
- # "responseid": "3",
- # "statusdescription": "creating",
- # "errorcode": "0",
- # "responsehistorylist": [
- # {"progress": "0",
- # "status": JOB_MODEL_STATUS.PROCESSING,
- # "responseid": "2",
- # "statusdescription": "creating",
- # "errorcode": "0"}]}}), '200']}
- #
- # req_data = {
- # "terminationType": "forceful",
- # "gracefulTerminationTimeout": "600"}
- #
- # def side_effect(*args):
- # return mock_vals[args[4]]
- # mock_call_req.side_effect = side_effect
- #
- # TerminateVnfs(req_data, self.nf_inst_id, job_id).run()
- # nfinst = NfInstModel.objects.filter(nfinstid=self.nf_inst_id)
- # if nfinst:
- # self.failUnlessEqual(1, 0)
- # else:
- # self.failUnlessEqual(1, 1)
+
+ @mock.patch.object(restcall, 'call_req')
+ def test_terminate_vnf(self, mock_call_req):
+ job_id = JobUtil.create_job("VNF", JOB_TYPE.TERMINATE_VNF, self.nf_inst_id)
+
+ nfinst = NfInstModel.objects.filter(nfinstid=self.nf_inst_id)
+ if nfinst:
+ self.failUnlessEqual(1, 1)
+ else:
+ self.failUnlessEqual(1, 0)
+
+ vnf_info = {
+ "vnf-id": "vnf-id-test111",
+ "vnf-name": "vnf-name-test111",
+ "vnf-type": "vnf-type-test111",
+ "in-maint": True,
+ "is-closed-loop-disabled": False,
+ "resource-version": "1505465356262"
+ }
+ job_info = {
+ "jobId": job_id,
+ "responsedescriptor": {
+ "progress": "100",
+ "status": JOB_MODEL_STATUS.FINISHED,
+ "responseid": "3",
+ "statusdescription": "creating",
+ "errorcode": "0",
+ "responsehistorylist": [
+ {
+ "progress": "0",
+ "status": JOB_MODEL_STATUS.PROCESSING,
+ "responseid": "2",
+ "statusdescription": "creating",
+ "errorcode": "0"
+ }
+ ]
+ }
+ }
+
+ mock_vals = {
+ "/external-system/esr-vnfm-list/esr-vnfm/1?depth=all":
+ [0, json.JSONEncoder().encode(vnfm_info), '200'],
+ "/api/ztevmanagerdriver/v1/1/vnfs/111/terminate":
+ [0, json.JSONEncoder().encode({"jobId": job_id}), '200'],
+ "/api/resmgr/v1/vnf/1":
+ [0, json.JSONEncoder().encode({"jobId": job_id}), '200'],
+ "/api/ztevmanagerdriver/v1/1/jobs/" + job_id + "?responseId=0":
+ [0, json.JSONEncoder().encode(job_info), '200'],
+ "/network/generic-vnfs/generic-vnf/111?depth=all":
+ [0, json.JSONEncoder().encode(vnf_info), '200'],
+ "/network/generic-vnfs/generic-vnf/111?resource-version=1505465356262":
+ [0, json.JSONEncoder().encode({}), '200']
+ }
+
+ def side_effect(*args):
+ return mock_vals[args[4]]
+ mock_call_req.side_effect = side_effect
+
+ req_data = {
+ "terminationType": "forceful",
+ "gracefulTerminationTimeout": "600"}
+
+ TerminateVnfs(req_data, self.nf_inst_id, job_id).run()
+ nfinst = NfInstModel.objects.filter(nfinstid=self.nf_inst_id)
+ if nfinst:
+ self.failUnlessEqual(1, 0)
+ else:
+ self.failUnlessEqual(1, 1)