25bb3c9387f2272a8d3312eadc2f866964a905d3
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / ansible-awx-executor / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / ansible / executor / ComponentRemoteAnsibleExecutor.kt
1 /*
2  *  Copyright © 2019 Bell Canada.
3  *  Modifications Copyright © 2018-2019 IBM.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.functions.ansible.executor
19
20 import com.fasterxml.jackson.databind.JsonNode
21 import com.fasterxml.jackson.databind.ObjectMapper
22 import com.fasterxml.jackson.databind.node.TextNode
23 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
24 import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BluePrintRestLibPropertyService
25 import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BlueprintWebClientService
26 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
27 import org.onap.ccsdk.cds.controllerblueprints.core.*
28 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
29 import org.slf4j.LoggerFactory
30 import org.springframework.beans.factory.config.ConfigurableBeanFactory
31 import org.springframework.context.annotation.Scope
32 import org.springframework.http.HttpMethod
33 import org.springframework.stereotype.Component
34 import java.net.URI
35 import java.net.URLEncoder
36 import java.util.*
37
38 /**
39  * ComponentRemoteAnsibleExecutor
40  *
41  * Component that launches a run of a job template (INPUT_JOB_TEMPLATE_NAME) representing an Ansible playbook,
42  * and its parameters, via the AWX server identified by the INPUT_ENDPOINT_SELECTOR parameter.
43  *
44  * It supports extra_vars, limit, tags, skip-tags, inventory (by name or Id) Ansible parameters.
45  * It reports the results of the execution via properties, named execute-command-status and execute-command-logs
46  *
47  * @author Serge Simard
48  */
49 @Component("component-remote-ansible-executor")
50 @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
51 open class ComponentRemoteAnsibleExecutor(private val blueprintRestLibPropertyService: BluePrintRestLibPropertyService,
52                                           private val mapper: ObjectMapper)
53     : AbstractComponentFunction() {
54
55     // HTTP related constants
56     private val HTTP_SUCCESS = 200..202
57     private val GET = HttpMethod.GET.name
58     private val POST = HttpMethod.POST.name
59     private val plainTextHeaders = mapOf("Accept" to "text/plain")
60
61     var checkDelay: Long = 15_000
62
63     companion object {
64         private val log = LoggerFactory.getLogger(ComponentRemoteAnsibleExecutor::class.java)
65
66         // input fields names accepted by this executor
67         const val INPUT_ENDPOINT_SELECTOR = "endpoint-selector"
68         const val INPUT_JOB_TEMPLATE_NAME = "job-template-name"
69         const val INPUT_WORKFLOW_JOB_TEMPLATE_NAME = "workflow-job-template-id"
70         const val INPUT_LIMIT_TO_HOST = "limit"
71         const val INPUT_INVENTORY = "inventory"
72         const val INPUT_EXTRA_VARS = "extra-vars"
73         const val INPUT_TAGS = "tags"
74         const val INPUT_SKIP_TAGS = "skip-tags"
75
76         // output fields names (and values) populated by this executor; aligned with job details status field values.
77         const val ATTRIBUTE_EXEC_CMD_ARTIFACTS = "ansible-artifacts"
78         const val ATTRIBUTE_EXEC_CMD_STATUS = "ansible-command-status"
79         const val ATTRIBUTE_EXEC_CMD_LOG = "ansible-command-logs"
80         const val ATTRIBUTE_EXEC_CMD_STATUS_ERROR = "error"
81     }
82
83     override suspend fun processNB(executionRequest: ExecutionServiceInput) {
84
85         try {
86             val restClientService = getAWXRestClient()
87
88             // Get either a job template name or a workflow template name property
89             var workflowURIPrefix = ""
90             var jobTemplateName = getOperationInput(INPUT_JOB_TEMPLATE_NAME).returnNullIfMissing()?.textValue() ?: ""
91             val isWorkflowJT = jobTemplateName.isBlank()
92             if (isWorkflowJT) {
93                 jobTemplateName = getOperationInput(INPUT_WORKFLOW_JOB_TEMPLATE_NAME).asText()
94                 workflowURIPrefix = "workflow_"
95             }
96
97             val jtId = lookupJobTemplateIDByName(restClientService, jobTemplateName, workflowURIPrefix)
98             if (jtId.isNotEmpty()) {
99                 runJobTemplateOnAWX(restClientService, jobTemplateName, jtId, workflowURIPrefix)
100             } else {
101                 val message = "Workflow/Job template ${jobTemplateName} does not exists"
102                 log.error(message)
103                 setNodeOutputErrors(ATTRIBUTE_EXEC_CMD_STATUS_ERROR, message)
104             }
105         } catch (e: Exception) {
106             log.error("Failed to process on remote executor (${e.message})", e)
107             setNodeOutputErrors(ATTRIBUTE_EXEC_CMD_STATUS_ERROR, "Failed to process on remote executor (${e.message})")
108         }
109     }
110
111
112     override suspend fun recoverNB(runtimeException: RuntimeException, executionRequest: ExecutionServiceInput) {
113         val message = "Error in ComponentRemoteAnsibleExecutor : ${runtimeException.message}"
114         log.error(message, runtimeException)
115         setNodeOutputErrors(ATTRIBUTE_EXEC_CMD_STATUS_ERROR, message)
116     }
117
118     /** Creates a TokenAuthRestClientService, since this executor expect type property to be "token-auth" and the
119      * token to be an OAuth token (access_token response field) generated via the AWX /api/o/token rest endpoint
120      * The token field is of the form "Bearer access_token_from_response", for example :
121      *  "blueprintsprocessor.restclient.awx.type=token-auth"
122      *  "blueprintsprocessor.restclient.awx.url=http://awx-endpoint"
123      *  "blueprintsprocessor.restclient.awx.token=Bearer J9gEtMDzxcqw25574fioY9VAhLDIs1"
124      *
125      * Also supports json endpoint definition via DSL entry, e.g.:
126      *     "ansible-remote-endpoint": {
127      *        "type": "token-auth",
128      *        "url": "http://awx-endpoint",
129      *        "token": "Bearer J9gEtMDzxcqw25574fioY9VAhLDIs1"
130      *     }
131      */
132     private fun getAWXRestClient(): BlueprintWebClientService {
133
134         val endpointSelector = getOperationInput(INPUT_ENDPOINT_SELECTOR)
135
136         try {
137             return blueprintRestLibPropertyService.blueprintWebClientService(endpointSelector)
138         } catch (e: NoSuchElementException) {
139             throw IllegalArgumentException("No value provided for input selector $endpointSelector", e)
140         }
141     }
142
143     /**
144      * Finds the job template ID based on the job template name provided in the request
145      */
146     private fun lookupJobTemplateIDByName(awxClient: BlueprintWebClientService, job_template_name: String?,
147                                           workflowPrefix : String) : String {
148         val encodedJTName = URI(null, null,
149                 "/api/v2/${workflowPrefix}job_templates/${job_template_name}/",
150                 null, null).rawPath
151
152         // Get Job Template details by name
153         var response = awxClient.exchangeResource(GET, encodedJTName, "")
154         val jtDetails: JsonNode = mapper.readTree(response.body)
155         return jtDetails.at("/id").asText()
156     }
157
158     /**
159      * Performs the job template execution on AWX, ie. prepare arguments as per job template
160      * requirements (ask fields) and provided overriding values. Then it launches the run, and monitors
161      * its execution. Finally, it retrieves the job results via the stdout api.
162      * The status and output attributes are populated in the process.
163      */
164     private fun runJobTemplateOnAWX(awxClient: BlueprintWebClientService, job_template_name: String?, jtId: String,
165                                     workflowPrefix : String) {
166         setNodeOutputProperties("preparing".asJsonPrimitive(), "".asJsonPrimitive(),  "".asJsonPrimitive())
167
168         // Get Job Template requirements
169         var response = awxClient.exchangeResource(GET, "/api/v2/${workflowPrefix}job_templates/${jtId}/launch/", "")
170         // FIXME: handle non-successful SC
171         val jtLaunchReqs: JsonNode = mapper.readTree(response.body)
172         val payload = prepareLaunchPayload(awxClient, jtLaunchReqs, workflowPrefix.isNotBlank())
173
174         log.info("Running job with $payload, for requestId $processId.")
175
176         // Launch the job for the targeted template
177         var jtLaunched: JsonNode = JacksonUtils.objectMapper.createObjectNode()
178         response = awxClient.exchangeResource(POST, "/api/v2/${workflowPrefix}job_templates/${jtId}/launch/", payload)
179         if (response.status in HTTP_SUCCESS) {
180             jtLaunched = mapper.readTree(response.body)
181             val fieldsIgnored: JsonNode = jtLaunched.at("/ignored_fields")
182             if (fieldsIgnored.rootFieldsToMap().isNotEmpty()) {
183                 log.warn("Ignored fields : $fieldsIgnored, for requestId $processId.")
184             }
185         }
186
187         if (response.status in HTTP_SUCCESS) {
188             val jobId: String = jtLaunched.at("/id").asText()
189
190             // Poll current job status while job is not executed
191             var jobStatus = "unknown"
192             var jobEndTime = "null"
193             while (jobEndTime == "null") {
194                 response = awxClient.exchangeResource(GET, "/api/v2/${workflowPrefix}jobs/${jobId}/", "")
195                 val jobLaunched: JsonNode = mapper.readTree(response.body)
196                 jobStatus = jobLaunched.at("/status").asText()
197                 jobEndTime = jobLaunched.at("/finished").asText()
198                 Thread.sleep(checkDelay)
199             }
200
201             log.info("Execution of job template $job_template_name in job #$jobId finished with status ($jobStatus) for requestId $processId")
202
203             populateJobRunResponse(awxClient, jobId, workflowPrefix, jobStatus)
204
205         } else {
206             // The job template requirements were not fulfilled with the values passed in. The message below will
207             // provide more information via the response, like the ignored_fields, or variables_needed_to_start,
208             // or resources_needed_to_start, in order to help user pinpoint the problems with the request.
209             val message = "Execution of job template $job_template_name could not be started for requestId $processId." +
210                     " (Response: ${response.body}) "
211             log.error(message)
212             setNodeOutputErrors(ATTRIBUTE_EXEC_CMD_STATUS_ERROR, message)
213         }
214     }
215
216     /**
217      * Extracts the output from either a job stdout call OR collects the workflow run output, as well as the artifacts
218      * and populate the component corresponding output properties
219      */
220     private fun populateJobRunResponse(awxClient: BlueprintWebClientService, jobId: String, workflowPrefix: String,
221                                        jobStatus: String) {
222
223         val collectedResponses = StringBuilder(4096)
224         val artifacts: MutableMap<String, JsonNode> = mutableMapOf()
225
226         collectJobIdsRelatedToJobRun(awxClient, jobId, workflowPrefix).forEach { aJobId ->
227
228             // Collect the response text from the corresponding jobIds
229             var response = awxClient.exchangeResource(GET, "/api/v2/jobs/${aJobId}/stdout/?format=txt", "", plainTextHeaders)
230             if (response.status in HTTP_SUCCESS) {
231                 val jobOutput = response.body
232                 collectedResponses
233                         .append("Output for Job $aJobId :" + System.lineSeparator())
234                         .append(jobOutput)
235                         .append(System.lineSeparator())
236                 log.info("Response for job ${aJobId}: \n ${jobOutput} \n")
237             } else {
238                 log.warn("Could not gather response for job ${aJobId}. Status=${response.status}")
239             }
240
241             // Collect artifacts variables from each job and gather them up in one json node
242             response = awxClient.exchangeResource(GET, "/api/v2/jobs/${aJobId}/", "")
243             if (response.status in HTTP_SUCCESS) {
244                 val jobArtifacts = mapper.readTree(response.body).at("/artifacts")
245                 if (jobArtifacts != null) {
246                     artifacts.putAll(jobArtifacts.rootFieldsToMap())
247                 }
248             }
249         }
250
251         log.info("Artifacts for job ${jobId}: \n $artifacts \n")
252
253         setNodeOutputProperties(jobStatus.asJsonPrimitive(), collectedResponses.toString().asJsonPrimitive(), artifacts.asJsonNode())
254     }
255
256     /**
257      * List all the job Ids for a give workflow, i.e. sub jobs, or the jobId if not a workflow instance
258      */
259     private fun collectJobIdsRelatedToJobRun(awxClient: BlueprintWebClientService, jobId: String, workflowPrefix: String): Array<String> {
260
261         var jobIds: Array<String>
262
263         if (workflowPrefix.isNotEmpty()) {
264             var response = awxClient.exchangeResource(GET, "/api/v2/${workflowPrefix}jobs/${jobId}/workflow_nodes/", "")
265             val jobDetails = mapper.readTree(response.body).at("/results")
266
267             // gather up job Id of all actual job nodes that ran during the workflow
268             jobIds = emptyArray()
269             for (jobDetail in jobDetails.elements()) {
270                 if (jobDetail.at("/do_not_run").asText() == "false") {
271                     jobIds = jobIds.plus(jobDetail.at("/summary_fields/job/id").asText())
272                 }
273             }
274         } else {
275             jobIds = arrayOf(jobId)
276         }
277         return jobIds
278     }
279
280     /**
281      * Prepares the JSON payload expected by the job template api,
282      * by applying the overrides that were provided
283      * and allowed by the template definition flags in jtLaunchReqs
284      */
285     private fun prepareLaunchPayload(awxClient: BlueprintWebClientService, jtLaunchReqs: JsonNode,
286                                      isWorkflow : Boolean): String {
287         val payload = JacksonUtils.objectMapper.createObjectNode()
288
289         // Parameter defaults
290         val inventoryProp = getOptionalOperationInput(INPUT_INVENTORY)
291         val extraArgs = getOperationInput(INPUT_EXTRA_VARS)
292
293         if (!isWorkflow) {
294             val limitProp = getOptionalOperationInput(INPUT_LIMIT_TO_HOST)
295             val tagsProp = getOptionalOperationInput(INPUT_TAGS)
296             val skipTagsProp = getOptionalOperationInput(INPUT_SKIP_TAGS)
297
298             val askLimitOnLaunch = jtLaunchReqs.at("/ask_limit_on_launch").asBoolean()
299             if (askLimitOnLaunch && limitProp.isNotNull()) {
300                 payload.set(INPUT_LIMIT_TO_HOST, limitProp)
301             }
302             val askTagsOnLaunch = jtLaunchReqs.at("/ask_tags_on_launch").asBoolean()
303             if (askTagsOnLaunch && tagsProp.isNotNull()) {
304                 payload.set(INPUT_TAGS, tagsProp)
305             }
306             if (askTagsOnLaunch && skipTagsProp.isNotNull()) {
307                 payload.set("skip_tags", skipTagsProp)
308             }
309         }
310
311         val askInventoryOnLaunch = jtLaunchReqs.at("/ask_inventory_on_launch").asBoolean()
312         if (askInventoryOnLaunch && inventoryProp.isNotNull()) {
313             var inventoryKeyId = if (inventoryProp is TextNode) {
314                 resolveInventoryIdByName(awxClient, inventoryProp.textValue())?.asJsonPrimitive()
315             } else {
316                 inventoryProp
317             }
318             payload.set(INPUT_INVENTORY, inventoryKeyId)
319         }
320
321             payload.set("extra_vars", extraArgs)
322
323         return payload.asJsonString(false)
324     }
325
326     private fun resolveInventoryIdByName(awxClient: BlueprintWebClientService, inventoryProp: String): Int? {
327         var invId: Int? = null
328
329         // Get Inventory by name
330         val encoded = URLEncoder.encode(inventoryProp)
331         val response = awxClient.exchangeResource(GET, "/api/v2/inventories/?name=$encoded", "")
332         if (response.status in HTTP_SUCCESS) {
333             // Extract the inventory ID from response
334             val invDetails = mapper.readTree(response.body)
335             val nbInvFound = invDetails.at("/count").asInt()
336             if (nbInvFound == 1) {
337                 invId = invDetails["results"][0]["id"].asInt()
338                 log.info("Resolved inventory $inventoryProp to ID #: $invId")
339             }
340         }
341
342         if (invId == null) {
343             val message = "Could not resolve inventory $inventoryProp by name..."
344             log.error(message)
345             throw IllegalArgumentException(message)
346         }
347
348         return invId
349     }
350
351     /**
352      * Utility function to set the output properties of the executor node
353      */
354     private fun setNodeOutputProperties(status: JsonNode, message: JsonNode, artifacts: JsonNode) {
355         setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status)
356         log.info("Executor status   : $status")
357         setAttribute(ATTRIBUTE_EXEC_CMD_ARTIFACTS, artifacts)
358         log.info("Executor artifacts: $artifacts")
359         setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message)
360         log.info("Executor message  : $message")
361     }
362
363     /**
364      * Utility function to set the output properties and errors of the executor node, in cas of errors
365      */
366     private fun setNodeOutputErrors(status: String, message: String, artifacts: JsonNode = "".asJsonPrimitive()  ) {
367         setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive())
368         setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message.asJsonPrimitive())
369         setAttribute(ATTRIBUTE_EXEC_CMD_ARTIFACTS, artifacts)
370
371         addError(status, ATTRIBUTE_EXEC_CMD_LOG, message)
372     }
373 }