"""
def undeploy_workflow(deploy_id):
uri = "api/workflow/v1/package/{deployId}".format(deployId=deploy_id)
- ret = req_by_msb(uri, "DELETE")
+ ret = restcall.req_by_msb(uri, "DELETE")
if ret[0] != 0:
raise NSLCMException("Status code is %s, detail is %s.", ret[2], ret[1])
return json.JSONDecoder().decode(ret[1])
"""
def exec_workflow(content):
content_str = json.JSONEncoder().encode(content)
- ret = req_by_msb("api/workflow/v1/process/instance", "POST", content_str)
+ ret = restcall.req_by_msb("api/workflow/v1/process/instance", "POST", content_str)
if ret[0] != 0:
raise NSLCMException("Status code is %s, detail is %s.", ret[2], ret[1])
return json.JSONDecoder().decode(ret[1])
self.assertEqual(1, len(WFPlanModel.objects.filter(deployed_id="3")))
@mock.patch.object(restcall, 'upload_by_msb')
- def test_force_deploy_workflow(self, mock_upload_by_msb):
+ @mock.patch.object(restcall, 'call_req')
+ def test_force_deploy_workflow(self, mock_call_req, mock_upload_by_msb):
+ mock_call_req.return_value = [0, json.JSONEncoder().encode({
+ "status": "1",
+ "message": "2"
+ }), '202']
mock_upload_by_msb.return_value = [0, json.JSONEncoder().encode({
"status": "2",
"message": "3",
force_deploy = ignore_case_get(request.data, "forceDeploy")
logger.debug("file_path is %s, force_deploy is %s", file_path, force_deploy)
if force_deploy.upper() == "TRUE":
- WFPlanModel.objects.filter().delete()
+ plans = WFPlanModel.objects.filter()
+ if len(plans) > 0:
+ activiti.undeploy_workflow(plans[0].deployed_id)
+ plans.delete()
else:
if WFPlanModel.objects.filter():
logger.warn("Already deployed.")