9ea6034a8fc8089965cb42ca7b76551ee7c233c9
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / ansible-awx-executor / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / ansible / executor / ComponentRemoteAnsibleExecutor.kt
1 /*
2  *  Copyright © 2019 Bell Canada.
3  *  Modifications Copyright © 2018-2019 IBM.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.functions.ansible.executor
19
20 import com.fasterxml.jackson.databind.JsonNode
21 import com.fasterxml.jackson.databind.ObjectMapper
22 import com.fasterxml.jackson.databind.node.TextNode
23 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
24 import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BluePrintRestLibPropertyService
25 import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BlueprintWebClientService
26 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
27 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonNode
28 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
29 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
30 import org.onap.ccsdk.cds.controllerblueprints.core.isNullOrMissing
31 import org.onap.ccsdk.cds.controllerblueprints.core.returnNullIfMissing
32 import org.onap.ccsdk.cds.controllerblueprints.core.rootFieldsToMap
33 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
34 import org.slf4j.LoggerFactory
35 import org.springframework.beans.factory.config.ConfigurableBeanFactory
36 import org.springframework.context.annotation.Scope
37 import org.springframework.http.HttpMethod
38 import org.springframework.stereotype.Component
39 import java.net.URI
40 import java.net.URLEncoder
41 import java.util.NoSuchElementException
42
43 /**
44  * ComponentRemoteAnsibleExecutor
45  *
46  * Component that launches a run of a job template (INPUT_JOB_TEMPLATE_NAME) representing an Ansible playbook,
47  * and its parameters, via the AWX server identified by the INPUT_ENDPOINT_SELECTOR parameter.
48  *
49  * It supports extra_vars, limit, tags, skip-tags, inventory (by name or Id) Ansible parameters.
50  * It reports the results of the execution via properties, named execute-command-status and execute-command-logs
51  *
52  * @author Serge Simard
53  */
54 @Component("component-remote-ansible-executor")
55 @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
56 open class ComponentRemoteAnsibleExecutor(
57     private val blueprintRestLibPropertyService: BluePrintRestLibPropertyService,
58     private val mapper: ObjectMapper
59 ) :
60     AbstractComponentFunction() {
61
62     // HTTP related constants
63     private val HTTP_SUCCESS = 200..202
64     private val GET = HttpMethod.GET.name
65     private val POST = HttpMethod.POST.name
66     private val plainTextHeaders = mapOf("Accept" to "text/plain")
67
68     var checkDelay: Long = 15_000
69
70     companion object {
71         private val log = LoggerFactory.getLogger(ComponentRemoteAnsibleExecutor::class.java)
72
73         // input fields names accepted by this executor
74         const val INPUT_ENDPOINT_SELECTOR = "endpoint-selector"
75         const val INPUT_JOB_TEMPLATE_NAME = "job-template-name"
76         const val INPUT_WORKFLOW_JOB_TEMPLATE_NAME = "workflow-job-template-id"
77         const val INPUT_LIMIT_TO_HOST = "limit"
78         const val INPUT_INVENTORY = "inventory"
79         const val INPUT_EXTRA_VARS = "extra-vars"
80         const val INPUT_TAGS = "tags"
81         const val INPUT_SKIP_TAGS = "skip-tags"
82
83         // output fields names (and values) populated by this executor; aligned with job details status field values.
84         const val ATTRIBUTE_EXEC_CMD_ARTIFACTS = "ansible-artifacts"
85         const val ATTRIBUTE_EXEC_CMD_STATUS = "ansible-command-status"
86         const val ATTRIBUTE_EXEC_CMD_LOG = "ansible-command-logs"
87         const val ATTRIBUTE_EXEC_CMD_STATUS_ERROR = "error"
88     }
89
90     override suspend fun processNB(executionRequest: ExecutionServiceInput) {
91
92         try {
93             val restClientService = getAWXRestClient()
94
95             // Get either a job template name or a workflow template name property
96             var workflowURIPrefix = ""
97             var jobTemplateName = getOperationInput(INPUT_JOB_TEMPLATE_NAME).returnNullIfMissing()?.textValue() ?: ""
98             val isWorkflowJT = jobTemplateName.isBlank()
99             if (isWorkflowJT) {
100                 jobTemplateName = getOperationInput(INPUT_WORKFLOW_JOB_TEMPLATE_NAME).asText()
101                 workflowURIPrefix = "workflow_"
102             }
103
104             val jtId = lookupJobTemplateIDByName(restClientService, jobTemplateName, workflowURIPrefix)
105             if (jtId.isNotEmpty()) {
106                 runJobTemplateOnAWX(restClientService, jobTemplateName, jtId, workflowURIPrefix)
107             } else {
108                 val message = "Workflow/Job template $jobTemplateName does not exists"
109                 log.error(message)
110                 setNodeOutputErrors(ATTRIBUTE_EXEC_CMD_STATUS_ERROR, message)
111             }
112         } catch (e: Exception) {
113             log.error("Failed to process on remote executor (${e.message})", e)
114             setNodeOutputErrors(ATTRIBUTE_EXEC_CMD_STATUS_ERROR, "Failed to process on remote executor (${e.message})")
115         }
116     }
117
118     override suspend fun recoverNB(runtimeException: RuntimeException, executionRequest: ExecutionServiceInput) {
119         val message = "Error in ComponentRemoteAnsibleExecutor : ${runtimeException.message}"
120         log.error(message, runtimeException)
121         setNodeOutputErrors(ATTRIBUTE_EXEC_CMD_STATUS_ERROR, message)
122     }
123
124     /** Creates a TokenAuthRestClientService, since this executor expect type property to be "token-auth" and the
125      * token to be an OAuth token (access_token response field) generated via the AWX /api/o/token rest endpoint
126      * The token field is of the form "Bearer access_token_from_response", for example :
127      *  "blueprintsprocessor.restclient.awx.type=token-auth"
128      *  "blueprintsprocessor.restclient.awx.url=http://awx-endpoint"
129      *  "blueprintsprocessor.restclient.awx.token=Bearer J9gEtMDzxcqw25574fioY9VAhLDIs1"
130      *
131      * Also supports json endpoint definition via DSL entry, e.g.:
132      *     "ansible-remote-endpoint": {
133      *        "type": "token-auth",
134      *        "url": "http://awx-endpoint",
135      *        "token": "Bearer J9gEtMDzxcqw25574fioY9VAhLDIs1"
136      *     }
137      */
138     private fun getAWXRestClient(): BlueprintWebClientService {
139
140         val endpointSelector = getOperationInput(INPUT_ENDPOINT_SELECTOR)
141
142         try {
143             return blueprintRestLibPropertyService.blueprintWebClientService(endpointSelector)
144         } catch (e: NoSuchElementException) {
145             throw IllegalArgumentException("No value provided for input selector $endpointSelector", e)
146         }
147     }
148
149     /**
150      * Finds the job template ID based on the job template name provided in the request
151      */
152     private fun lookupJobTemplateIDByName(
153         awxClient: BlueprintWebClientService,
154         job_template_name: String?,
155         workflowPrefix: String
156     ): String {
157         val encodedJTName = URI(
158             null, null,
159             "/api/v2/${workflowPrefix}job_templates/$job_template_name/",
160             null, null
161         ).rawPath
162
163         // Get Job Template details by name
164         var response = awxClient.exchangeResource(GET, encodedJTName, "")
165         val jtDetails: JsonNode = mapper.readTree(response.body)
166         return jtDetails.at("/id").asText()
167     }
168
169     /**
170      * Performs the job template execution on AWX, ie. prepare arguments as per job template
171      * requirements (ask fields) and provided overriding values. Then it launches the run, and monitors
172      * its execution. Finally, it retrieves the job results via the stdout api.
173      * The status and output attributes are populated in the process.
174      */
175     private fun runJobTemplateOnAWX(
176         awxClient: BlueprintWebClientService,
177         job_template_name: String?,
178         jtId: String,
179         workflowPrefix: String
180     ) {
181         setNodeOutputProperties("preparing".asJsonPrimitive(), "".asJsonPrimitive(), "".asJsonPrimitive())
182
183         // Get Job Template requirements
184         var response = awxClient.exchangeResource(GET, "/api/v2/${workflowPrefix}job_templates/$jtId/launch/", "")
185         // FIXME: handle non-successful SC
186         val jtLaunchReqs: JsonNode = mapper.readTree(response.body)
187         val payload = prepareLaunchPayload(awxClient, jtLaunchReqs, workflowPrefix.isNotBlank())
188
189         log.info("Running job with $payload, for requestId $processId.")
190
191         // Launch the job for the targeted template
192         var jtLaunched: JsonNode = JacksonUtils.objectMapper.createObjectNode()
193         response = awxClient.exchangeResource(POST, "/api/v2/${workflowPrefix}job_templates/$jtId/launch/", payload)
194         if (response.status in HTTP_SUCCESS) {
195             jtLaunched = mapper.readTree(response.body)
196             val fieldsIgnored: JsonNode = jtLaunched.at("/ignored_fields")
197             if (fieldsIgnored.rootFieldsToMap().isNotEmpty()) {
198                 log.warn("Ignored fields : $fieldsIgnored, for requestId $processId.")
199             }
200         }
201
202         if (response.status in HTTP_SUCCESS) {
203             val jobId: String = jtLaunched.at("/id").asText()
204
205             // Poll current job status while job is not executed
206             var jobStatus = "unknown"
207             var jobEndTime = "null"
208             while (jobEndTime == "null") {
209                 response = awxClient.exchangeResource(GET, "/api/v2/${workflowPrefix}jobs/$jobId/", "")
210                 val jobLaunched: JsonNode = mapper.readTree(response.body)
211                 jobStatus = jobLaunched.at("/status").asText()
212                 jobEndTime = jobLaunched.at("/finished").asText()
213                 Thread.sleep(checkDelay)
214             }
215
216             log.info("Execution of job template $job_template_name in job #$jobId finished with status ($jobStatus) for requestId $processId")
217
218             populateJobRunResponse(awxClient, jobId, workflowPrefix, jobStatus)
219         } else {
220             // The job template requirements were not fulfilled with the values passed in. The message below will
221             // provide more information via the response, like the ignored_fields, or variables_needed_to_start,
222             // or resources_needed_to_start, in order to help user pinpoint the problems with the request.
223             val message = "Execution of job template $job_template_name could not be started for requestId $processId." +
224                     " (Response: ${response.body}) "
225             log.error(message)
226             setNodeOutputErrors(ATTRIBUTE_EXEC_CMD_STATUS_ERROR, message)
227         }
228     }
229
230     /**
231      * Extracts the output from either a job stdout call OR collects the workflow run output, as well as the artifacts
232      * and populate the component corresponding output properties
233      */
234     private fun populateJobRunResponse(
235         awxClient: BlueprintWebClientService,
236         jobId: String,
237         workflowPrefix: String,
238         jobStatus: String
239     ) {
240
241         val collectedResponses = StringBuilder(4096)
242         val artifacts: MutableMap<String, JsonNode> = mutableMapOf()
243
244         collectJobIdsRelatedToJobRun(awxClient, jobId, workflowPrefix).forEach { aJobId ->
245
246             // Collect the response text from the corresponding jobIds
247             var response = awxClient.exchangeResource(GET, "/api/v2/jobs/$aJobId/stdout/?format=txt", "", plainTextHeaders)
248             if (response.status in HTTP_SUCCESS) {
249                 val jobOutput = response.body
250                 collectedResponses
251                     .append("Output for Job $aJobId :" + System.lineSeparator())
252                     .append(jobOutput)
253                     .append(System.lineSeparator())
254                 log.info("Response for job $aJobId: \n $jobOutput \n")
255             } else {
256                 log.warn("Could not gather response for job $aJobId. Status=${response.status}")
257             }
258
259             // Collect artifacts variables from each job and gather them up in one json node
260             response = awxClient.exchangeResource(GET, "/api/v2/jobs/$aJobId/", "")
261             if (response.status in HTTP_SUCCESS) {
262                 val jobArtifacts = mapper.readTree(response.body).at("/artifacts")
263                 if (jobArtifacts != null) {
264                     artifacts.putAll(jobArtifacts.rootFieldsToMap())
265                 }
266             }
267         }
268
269         log.info("Artifacts for job $jobId: \n $artifacts \n")
270
271         setNodeOutputProperties(jobStatus.asJsonPrimitive(), collectedResponses.toString().asJsonPrimitive(), artifacts.asJsonNode())
272     }
273
274     /**
275      * List all the job Ids for a give workflow, i.e. sub jobs, or the jobId if not a workflow instance
276      */
277     private fun collectJobIdsRelatedToJobRun(awxClient: BlueprintWebClientService, jobId: String, workflowPrefix: String): Array<String> {
278
279         var jobIds: Array<String>
280
281         if (workflowPrefix.isNotEmpty()) {
282             var response = awxClient.exchangeResource(GET, "/api/v2/${workflowPrefix}jobs/$jobId/workflow_nodes/", "")
283             val jobDetails = mapper.readTree(response.body).at("/results")
284
285             // gather up job Id of all actual job nodes that ran during the workflow
286             jobIds = emptyArray()
287             for (jobDetail in jobDetails.elements()) {
288                 if (jobDetail.at("/do_not_run").asText() == "false") {
289                     jobIds = jobIds.plus(jobDetail.at("/summary_fields/job/id").asText())
290                 }
291             }
292         } else {
293             jobIds = arrayOf(jobId)
294         }
295         return jobIds
296     }
297
298     /**
299      * Prepares the JSON payload expected by the job template api,
300      * by applying the overrides that were provided
301      * and allowed by the template definition flags in jtLaunchReqs
302      */
303     private fun prepareLaunchPayload(
304         awxClient: BlueprintWebClientService,
305         jtLaunchReqs: JsonNode,
306         isWorkflow: Boolean
307     ): String {
308         val payload = JacksonUtils.objectMapper.createObjectNode()
309
310         // Parameter defaults
311         val inventoryProp = getOptionalOperationInput(INPUT_INVENTORY)
312         val extraArgs = getOperationInput(INPUT_EXTRA_VARS)
313
314         if (!isWorkflow) {
315             val limitProp = getOptionalOperationInput(INPUT_LIMIT_TO_HOST)
316             val tagsProp = getOptionalOperationInput(INPUT_TAGS)
317             val skipTagsProp = getOptionalOperationInput(INPUT_SKIP_TAGS)
318
319             val askLimitOnLaunch = jtLaunchReqs.at("/ask_limit_on_launch").asBoolean()
320             if (askLimitOnLaunch && !limitProp.isNullOrMissing()) {
321                 payload.set(INPUT_LIMIT_TO_HOST, limitProp)
322             }
323             val askTagsOnLaunch = jtLaunchReqs.at("/ask_tags_on_launch").asBoolean()
324             if (askTagsOnLaunch && !tagsProp.isNullOrMissing()) {
325                 payload.set(INPUT_TAGS, tagsProp)
326             }
327             if (askTagsOnLaunch && !skipTagsProp.isNullOrMissing()) {
328                 payload.set("skip_tags", skipTagsProp)
329             }
330         }
331
332         val askInventoryOnLaunch = jtLaunchReqs.at("/ask_inventory_on_launch").asBoolean()
333         if (askInventoryOnLaunch && !inventoryProp.isNullOrMissing()) {
334             var inventoryKeyId = if (inventoryProp is TextNode) {
335                 resolveInventoryIdByName(awxClient, inventoryProp.textValue())?.asJsonPrimitive()
336             } else {
337                 inventoryProp
338             }
339             payload.set(INPUT_INVENTORY, inventoryKeyId)
340         }
341
342         payload.set("extra_vars", extraArgs)
343
344         return payload.asJsonString(false)
345     }
346
347     private fun resolveInventoryIdByName(awxClient: BlueprintWebClientService, inventoryProp: String): Int? {
348         var invId: Int? = null
349
350         // Get Inventory by name
351         val encoded = URLEncoder.encode(inventoryProp)
352         val response = awxClient.exchangeResource(GET, "/api/v2/inventories/?name=$encoded", "")
353         if (response.status in HTTP_SUCCESS) {
354             // Extract the inventory ID from response
355             val invDetails = mapper.readTree(response.body)
356             val nbInvFound = invDetails.at("/count").asInt()
357             if (nbInvFound == 1) {
358                 invId = invDetails["results"][0]["id"].asInt()
359                 log.info("Resolved inventory $inventoryProp to ID #: $invId")
360             }
361         }
362
363         if (invId == null) {
364             val message = "Could not resolve inventory $inventoryProp by name..."
365             log.error(message)
366             throw IllegalArgumentException(message)
367         }
368
369         return invId
370     }
371
372     /**
373      * Utility function to set the output properties of the executor node
374      */
375     private fun setNodeOutputProperties(status: JsonNode, message: JsonNode, artifacts: JsonNode) {
376         setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status)
377         log.info("Executor status   : $status")
378         setAttribute(ATTRIBUTE_EXEC_CMD_ARTIFACTS, artifacts)
379         log.info("Executor artifacts: $artifacts")
380         setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message)
381         log.info("Executor message  : $message")
382     }
383
384     /**
385      * Utility function to set the output properties and errors of the executor node, in cas of errors
386      */
387     private fun setNodeOutputErrors(status: String, message: String, artifacts: JsonNode = "".asJsonPrimitive()) {
388         setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive())
389         setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message.asJsonPrimitive())
390         setAttribute(ATTRIBUTE_EXEC_CMD_ARTIFACTS, artifacts)
391
392         addError(status, ATTRIBUTE_EXEC_CMD_LOG, message)
393     }
394 }