Add create vnf of build-in workflow
[vfc/nfvo/lcm.git] / lcm / workflows / build_in.py
1 # Copyright 2017 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import logging
15 import traceback
16 from threading import Thread
17 import time
18
19 from lcm.pub.utils.syscomm import fun_name
20 from lcm.pub.utils.values import ignore_case_get
21 from lcm.pub.utils import restcall
22 from lcm.pub.exceptions import NSLCMException
23
24 logger = logging.getLogger(__name__)
25
26 RESULT_OK, RESULT_NG = "0", "1"
27 JOB_ERROR = 255
28
29 g_jobs_status = {}
30
31 """
32 format of input_data
33 {
34     "jobId": uuid of job, 
35     "nsInstanceId": id of ns instance,
36     "object_context": json format of nsd,
37     "object_additionalParamForNs": json format of additional parameters for ns,
38     "object_additionalParamForVnf": json format of additional parameters for vnf,
39     "vlCount": int type of VL count,
40     "vnfCount: int type of VNF count,
41     "sfcCount": int type of SFC count, 
42     "sdnControllerId": uuid of SDN controller
43 }
44 """
45 def run_ns_instantiate(input_data):
46     logger.debug("Enter %s, input_data is %s", fun_name(), input_data)
47     job_id = ignore_case_get(input_data, "jobId")
48     ns_inst_id = ignore_case_get(input_data, "nsInstanceId")
49     nsd_json = ignore_case_get(input_data, "object_context")
50     ns_param_json = ignore_case_get(input_data, "object_additionalParamForNs")
51     vnf_param_json = ignore_case_get(input_data, "object_additionalParamForVnf")
52     vl_count = ignore_case_get(input_data, "vlCount")
53     vnf_count = ignore_case_get(input_data, "vnfCount")
54     sfc_count = ignore_case_get(input_data, "sfcCount")
55     sdnc_id = ignore_case_get(input_data, "sdnControllerId")
56     g_jobs_status[job_id] = [1 for i in range(vnf_count)]
57     try:
58         update_job(job_id, 10, "0", "Start to create VL")
59         for i in range(vl_count):
60             create_vl(ns_inst_id, i + 1, nsd_json, ns_param_json)
61
62         update_job(job_id, 30, "0", "Start to create VNF")
63         jobs = [create_vnf(ns_inst_id, i + 1, vnf_param_json) for i in range(vnf_count)] 
64         wait_until_jobs_done(job_id, jobs)
65
66         update_job(job_id, 70, "0", "Start to create SFC")
67         for i in range(sfc_count):
68             create_sfc()
69
70         update_job(job_id, 90, "0", "Start to post deal")
71         post_deal()
72
73         update_job(job_id, 100, "0", "Create NS successfully.")
74     except NSLCMException as e:
75         logger.error("Failded to Create NS: %s", e.message)
76         update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.")
77     except:
78         logger.error(traceback.format_exc())
79         update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.")
80     finally:
81         g_jobs_status.pop(job_id)
82
83
84
85 def create_vl(ns_inst_id, vl_index, nsd, ns_param):
86     uri = "api/nslcm/v1/ns/vls"
87     data = json.JSONEncoder().encode({
88         "nsInstanceId": ns_inst_id,
89         "vlIndex": vl_index,
90         "context": nsd,
91         "additionalParamForNs": ns_param
92     })
93
94     ret = restcall.req_by_msb(uri, "POST", data)
95     if ret[0] != 0:
96         logger.error("Failed to call create_vl(%s): %s", vl_index, ret[1])
97         raise NSLCMException("Failed to call create_vl(index is %s)" % vl_index)
98
99     result = str(ret[1]["result"])
100     detail = ret[1]["detail"]
101     vl_id = ret[1]["vlId"]
102     if result != RESULT_OK:
103         logger.error("Failed to create VL(%s): %s", vl_id, detail)
104         raise NSLCMException("Failed to create VL(%s)" % vl_id)
105
106     logger.debug("Create VL(%s) successfully.", vl_id)
107
108 def create_vnf(ns_inst_id, vnf_index, nf_param):
109     uri = "/ns/vnfs"
110     data = json.JSONEncoder().encode({
111         "nsInstanceId": ns_inst_id,
112         "vnfIndex": vnf_index,
113         "additionalParamForVnf": nf_param
114     })
115
116     ret = restcall.req_by_msb(uri, "POST", data)
117     if ret[0] != 0:
118         logger.error("Failed to call create_vnf(%s): %s", vnf_index, ret[1])
119         raise NSLCMException("Failed to call create_vnf(index is %s)" % vnf_index)
120
121     vnf_inst_id = ret[1]["vnfInstId"]
122     job_id = ret[1]["jobId"]
123     logger.debug("Create VNF(%s) started.", vnf_inst_id)
124     return vnf_inst_id, job_id, vnf_index - 1
125
126 def create_sfc():
127     # TODO:
128     pass
129
130 def post_deal():
131     # TODO:
132     pass    
133
134 def update_job(job_id, progress, errcode, desc):
135     uri = "api/nslcm/v1/jobs/{jobId}".format(jobId=job_id)
136     data = json.JSONEncoder().encode({
137         "progress": progress,
138         "errcode": errcode,
139         "desc": desc
140     })
141     restcall.req_by_msb(uri, "POST", data)  
142
143 class JobWaitThread(Thread):
144     """
145     Job Wait 
146     """
147
148     def __init__(self, inst_id, job_id, index):
149         Thread.__init__(self)
150         self.inst_id = inst_id
151         self.job_id = job_id
152         self.index = index
153         self.retry_count = 60
154         self.interval_second = 3
155
156     def run(self):
157         count = 0
158         response_id, new_response_id = 0, 0
159         job_end_normal, job_timeout = False, True
160         while count < self.retry_count:
161             count = count + 1
162             time.sleep(self.interval_second)
163             uri = "/api/nslcm/v1/jobs/%s?responseId=%s" % (self.job_id, response_id)
164             ret = restcall.req_by_msb(uri, "GET")
165             if ret[0] != 0:
166                 logger.error("Failed to query job: %s:%s", ret[2], ret[1])
167                 continue
168             job_result = json.JSONDecoder().decode(ret[1])
169             if "responseDescriptor" not in job_result:
170                 logger.error("Job(%s) does not exist.", self.job_id)
171                 continue
172             progress = job_result["responseDescriptor"]["progress"]
173             new_response_id = job_result["responseDescriptor"]["responseId"]
174             job_desc = job_result["responseDescriptor"]["statusDescription"]
175             if new_response_id != response_id:
176                 logger.debug("%s:%s:%s", progress, new_response_id, job_desc)
177                 response_id = new_response_id
178                 count = 0
179             if progress == JOB_ERROR:
180                 job_timeout = False
181                 logger.error("Job(%s) failed: %s", self.job_id, job_desc)
182                 break
183             elif progress == 100:
184                 job_end_normal, job_timeout = True, False
185                 logger.info("Job(%s) ended normally", self.job_id)
186                 break
187         if job_timeout:
188             logger.error("Job(%s) timeout", self.job_id)
189         if self.job_id in g_jobs_status:
190             if job_end_normal:
191                 g_jobs_status[self.job_id][self.index] = 0
192
193 def wait_until_jobs_done(g_job_id, jobs):
194     job_threads = []
195     for inst_id, job_id, index in jobs:
196         job_threads.append(JobWaitThread(inst_id, job_id, index))
197     for t in job_threads:
198         t.start()
199     for t in job_threads:
200         t.join()
201     if g_job_id in g_jobs_status:
202         if sum(g_jobs_status[g_job_id]) > 0:
203             raise NSLCMException("Some jobs failed!")
204       
205