1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
20 # Lifecycle interface calls for containerized components
22 # Needed by Cloudify Manager to load google.auth for the Kubernetes python client
23 from . import cloudify_importer
27 from cloudify import ctx
28 from cloudify.decorators import operation
29 from cloudify.exceptions import NonRecoverableError, RecoverableError
30 from onap_dcae_dcaepolicy_lib import Policies
31 from k8splugin import discovery as dis
32 from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
33 merge_inputs_for_start, merge_inputs_for_create, wrap_error_handling_update
34 from k8splugin.exceptions import DockerPluginDeploymentError
35 from k8splugin import utils
36 from configure import configure
40 plugin_conf = configure.configure()
41 CONSUL_HOST = plugin_conf.get("consul_host")
42 CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
43 DCAE_NAMESPACE = plugin_conf.get("namespace")
44 DEFAULT_MAX_WAIT = plugin_conf.get("max_wait")
45 DEFAULT_K8S_LOCATION = plugin_conf.get("default_k8s_location")
46 COMPONENT_CERT_DIR = plugin_conf.get("tls",{}).get("component_cert_dir")
47 CBS_BASE_URL = plugin_conf.get("cbs").get("base_url")
49 # Used to construct delivery urls for data router subscribers. Data router in FTL
50 # requires https but this author believes that ONAP is to be defaulted to http.
51 DEFAULT_SCHEME = "http"
54 SERVICE_COMPONENT_NAME = "service_component_name"
55 CONTAINER_ID = "container_id"
56 APPLICATION_CONFIG = "application_config"
57 K8S_DEPLOYMENT = "k8s_deployment"
58 RESOURCE_KW = "resource_config"
59 LOCATION_ID = "location_id"
63 # Lifecycle interface calls for dcae.nodes.DockerContainer
65 def _setup_for_discovery(**kwargs):
66 """Setup for config discovery"""
69 application_config = kwargs[APPLICATION_CONFIG]
71 # NOTE: application_config is no longer a json string and is inputed as a
72 # YAML map which translates to a dict. We don't have to do any
73 # preprocessing anymore.
74 conn = dis.create_kv_conn(CONSUL_HOST)
75 dis.push_service_component_config(conn, name, application_config)
77 except dis.DiscoveryConnectionError as e:
78 raise RecoverableError(e)
79 except Exception as e:
80 ctx.logger.error("Unexpected error while pushing configuration: {0}"
82 raise NonRecoverableError(e)
84 def _generate_component_name(**kwargs):
85 """Generate component name"""
86 service_component_type = kwargs['service_component_type']
87 name_override = kwargs['service_component_name_override']
89 kwargs['name'] = name_override if name_override \
90 else dis.generate_service_component_name(service_component_type)
93 def _done_for_create(**kwargs):
94 """Wrap up create operation"""
96 kwargs[SERVICE_COMPONENT_NAME] = name
97 # All updates to the runtime_properties happens here. I don't see a reason
98 # why we shouldn't do this because the context is not being mutated by
99 # something else and will keep the other functions pure (pure in the sense
100 # not dealing with CloudifyContext).
101 ctx.instance.runtime_properties.update(kwargs)
102 ctx.logger.info("Done setting up: {0}".format(name))
105 def _get_resources(**kwargs):
106 if kwargs is not None:
107 ctx.logger.debug("{0}: {1}".format(RESOURCE_KW, kwargs.get(RESOURCE_KW)))
108 return kwargs.get(RESOURCE_KW)
109 ctx.logger.info("set resources to None")
113 ''' Get the k8s location property. Set to the default if the property is missing, None, or zero-length '''
114 return ctx.node.properties["location_id"] if "location_id" in ctx.node.properties and ctx.node.properties["location_id"] \
115 else DEFAULT_K8S_LOCATION
117 @merge_inputs_for_create
119 @Policies.gather_policies_to_node()
121 def create_for_components(**create_inputs):
122 """Create step for service components
124 This interface is responsible for:
126 1. Generating service component name
127 2. Populating config information into Consul
130 **_setup_for_discovery(
131 **_enhance_docker_params(
132 **_generate_component_name(
136 def _parse_streams(**kwargs):
137 """Parse streams and setup for DMaaP plugin"""
138 # The DMaaP plugin requires this plugin to set the runtime properties
139 # keyed by the node name.
140 for stream in kwargs["streams_publishes"]:
141 kwargs[stream["name"]] = stream
143 for stream in kwargs["streams_subscribes"]:
144 if stream["type"] == "data_router":
146 # Don't want to mutate the source
147 stream = copy.deepcopy(stream)
149 # Set up the delivery URL
150 # Using service_component_name as the host name in the subscriber URL
151 # will work in a single-cluster ONAP deployment. Whether it will also work
152 # in a multi-cluster ONAP deployment--with a central location and one or
153 # more remote ("edge") locations depends on how networking and DNS is set
154 # up in a multi-cluster deployment
155 service_component_name = kwargs["name"]
156 ports, _ = k8sclient.parse_ports(kwargs["ports"])
158 subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport)
160 scheme = stream.get("scheme", DEFAULT_SCHEME)
161 if "route" not in stream:
162 raise NonRecoverableError("'route' key missing from data router subscriber")
163 path = stream["route"]
164 stream["delivery_url"] = "{scheme}://{host}/{path}".format(
165 scheme=scheme, host=subscriber_host, path=path)
167 # If username and password has not been provided then generate it. The
168 # DMaaP plugin doesn't generate for subscribers. The generation code
169 # and length of username password has been lifted from the DMaaP
171 if not stream.get("username", None):
172 stream["username"] = utils.random_string(8)
173 if not stream.get("password", None):
174 stream["password"] = utils.random_string(10)
176 kwargs[stream["name"]] = stream
180 @merge_inputs_for_create
182 @Policies.gather_policies_to_node()
184 def create_for_components_with_streams(**create_inputs):
185 """Create step for service components that use DMaaP
187 This interface is responsible for:
189 1. Generating service component name
190 2. Setup runtime properties for DMaaP plugin
191 3. Populating application config into Consul
194 **_setup_for_discovery(
196 **_enhance_docker_params(
197 **_generate_component_name(
200 def _verify_k8s_deployment(location, service_component_name, max_wait):
201 """Verify that the k8s Deployment is ready
205 location (string): location of the k8s cluster where the component was deployed
206 service_component_name: component's service component name
207 max_wait (integer): limit to how may attempts to make which translates to
208 seconds because each sleep is one second. 0 means infinite.
212 True if deployment is ready within the maximum wait time, False otherwise
217 if k8sclient.is_available(location, DCAE_NAMESPACE, service_component_name):
222 if max_wait > 0 and max_wait < num_attempts:
229 def _create_and_start_container(container_name, image, **kwargs):
231 This will create a k8s Deployment and, if needed, a k8s Service or two.
232 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
233 We're not exposing k8s to the component developer and the blueprint author.
234 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
235 the details from the component developer and the blueprint author.)
238 - volumes: array of volume objects, where a volume object is:
239 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
240 - ports: array of strings in the form "container_port:host_port"
241 - envs: map of name-value pairs ( {name0: value0, name1: value1...} )
242 - always_pull: boolean. If true, sets image pull policy to "Always"
243 so that a fresh copy of the image is always pull. Otherwise, sets
244 image pull policy to "IfNotPresent"
245 - log_info: an object with info for setting up ELK logging, with the form:
246 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}"
247 - tls_info: an object with information for setting up the component to act as a TLS server, with the form:
248 {"use_tls" : true_or_false, "cert_directory": "/path/to/directory_where_certs_should_be_placed" }
249 - replicas: number of replicas to be launched initially
250 - readiness: object with information needed to create a readiness check
251 - liveness: object with information needed to create a liveness check
252 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
254 tls_info = kwargs.get("tls_info") or {}
255 cert_dir = tls_info.get("cert_directory") or COMPONENT_CERT_DIR
256 env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
257 "CONFIG_BINDING_SERVICE": "config-binding-service",
258 "DCAE_CA_CERTPATH" : "{0}/cacert.pem".format(cert_dir),
259 "CBS_CONFIG_URL" : "{0}/{1}".format(CBS_BASE_URL, container_name)
261 env.update(kwargs.get("envs", {}))
262 ctx.logger.info("Starting k8s deployment for {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
263 ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
264 replicas = kwargs.get("replicas", 1)
265 resource_config = _get_resources(**kwargs)
266 _, dep = k8sclient.deploy(DCAE_NAMESPACE,
270 always_pull=kwargs.get("always_pull_image", False),
271 k8sconfig=plugin_conf,
272 resources=resource_config,
273 volumes=kwargs.get("volumes", []),
274 ports=kwargs.get("ports", []),
275 tls_info=kwargs.get("tls_info"),
277 labels=kwargs.get("labels", {}),
278 log_info=kwargs.get("log_info"),
279 readiness=kwargs.get("readiness"),
280 liveness=kwargs.get("liveness"),
281 k8s_location=kwargs.get("k8s_location"))
283 # Capture the result of deployment for future use
284 ctx.instance.runtime_properties[K8S_DEPLOYMENT] = dep
285 kwargs[K8S_DEPLOYMENT] = dep
286 ctx.instance.runtime_properties["replicas"] = replicas
287 ctx.logger.info ("k8s deployment initiated successfully for {0}: {1}".format(container_name, dep))
290 def _parse_cloudify_context(**kwargs):
291 """Parse Cloudify context
293 Extract what is needed. This is impure function because it requires ctx.
295 kwargs["deployment_id"] = ctx.deployment.id
297 # Set some labels for the Kubernetes pods
298 # The name segment is required and must be 63 characters or less
300 "cfydeployment" : ctx.deployment.id,
301 "cfynode": ctx.node.name[:63],
302 "cfynodeinstance": ctx.instance.id[:63]
305 # Pick up the centralized logging info
306 if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
307 kwargs["log_info"] = ctx.node.properties["log_info"]
309 # Pick up TLS info if present
310 if "tls_info" in ctx.node.properties:
311 kwargs["tls_info"] = ctx.node.properties["tls_info"]
313 # Pick up replica count and always_pull_image flag
314 if "replicas" in ctx.node.properties:
315 kwargs["replicas"] = ctx.node.properties["replicas"]
316 if "always_pull_image" in ctx.node.properties:
317 kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
320 kwargs["k8s_location"] = _get_location()
324 def _enhance_docker_params(**kwargs):
326 Set up Docker environment variables and readiness/liveness check info
327 and inject into kwargs.
330 # Get info for setting up readiness/liveness probe, if present
331 docker_config = kwargs.get("docker_config", {})
332 if "healthcheck" in docker_config:
333 kwargs["readiness"] = docker_config["healthcheck"]
334 if "livehealthcheck" in docker_config:
335 kwargs["liveness"] = docker_config["livehealthcheck"]
337 envs = kwargs.get("envs", {})
339 kwargs["envs"] = envs
341 def combine_params(key, docker_config, kwargs):
342 v = docker_config.get(key, []) + kwargs.get(key, [])
346 # Add the lists of ports and volumes unintelligently - meaning just add the
347 # lists together with no deduping.
348 kwargs = combine_params("ports", docker_config, kwargs)
349 kwargs = combine_params("volumes", docker_config, kwargs)
351 # Merge env vars from kwarg inputs and docker_config
352 kwargs["envs"].update(docker_config.get("envs", {}))
357 def _create_and_start_component(**kwargs):
358 """Create and start component (container)"""
359 image = kwargs["image"]
360 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
361 # Need to be picky and manually select out pieces because just using kwargs
362 # which contains everything confused the execution of
363 # _create_and_start_container because duplicate variables exist
365 "volumes": kwargs.get("volumes", []),
366 "ports": kwargs.get("ports", None),
367 "envs": kwargs.get("envs", {}),
368 "log_info": kwargs.get("log_info", {}),
369 "tls_info": kwargs.get("tls_info", {}),
370 "labels": kwargs.get("labels", {}),
371 "resource_config": kwargs.get("resource_config",{}),
372 "readiness": kwargs.get("readiness",{}),
373 "liveness": kwargs.get("liveness",{}),
374 "k8s_location": kwargs.get("k8s_location")}
375 returned_args = _create_and_start_container(service_component_name, image, **sub_kwargs)
376 kwargs[K8S_DEPLOYMENT] = returned_args[K8S_DEPLOYMENT]
380 def _verify_component(**kwargs):
381 """Verify deployment is ready"""
382 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
384 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
385 ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
387 if _verify_k8s_deployment(kwargs.get("k8s_location"), service_component_name, max_wait):
388 ctx.logger.info("k8s deployment is ready for: {0}".format(service_component_name))
390 # The component did not become ready within the "max_wait" interval.
391 # Delete the k8s components created already and remove configuration from Consul.
392 ctx.logger.error("k8s deployment never became ready for {0}".format(service_component_name))
393 if (K8S_DEPLOYMENT in kwargs) and (len(kwargs[K8S_DEPLOYMENT]["deployment"]) > 0):
394 ctx.logger.info("attempting to delete k8s artifacts: {0}".format(kwargs[K8S_DEPLOYMENT]))
395 k8sclient.undeploy(kwargs[K8S_DEPLOYMENT])
396 ctx.logger.info("deleted k8s artifacts: {0}".format(kwargs[K8S_DEPLOYMENT]))
397 cleanup_discovery(**kwargs)
398 raise DockerPluginDeploymentError("k8s deployment never became ready for {0}".format(service_component_name))
402 def _done_for_start(**kwargs):
403 ctx.instance.runtime_properties.update(kwargs)
404 ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
407 @wrap_error_handling_start
408 @merge_inputs_for_start
411 def create_and_start_container_for_components(**start_inputs):
412 """Initiate Kubernetes deployment for service components
414 This operation method is to be used with the ContainerizedServiceComponent
415 node type. After initiating a Kubernetes deployment, the plugin will verify with Kubernetes
416 that the app is up and responding successfully to readiness probes.
420 **_create_and_start_component(
421 **_parse_cloudify_context(**start_inputs))))
423 @wrap_error_handling_start
426 def create_and_start_container(**kwargs):
427 """Initiate a Kubernetes deployment for the generic ContainerizedApplication node type"""
428 service_component_name = ctx.node.properties["name"]
429 ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
431 image = ctx.node.properties["image"]
432 kwargs["k8s_location"] = _get_location()
434 _create_and_start_container(service_component_name, image,**kwargs)
438 def stop_and_remove_container(**kwargs):
439 """Delete Kubernetes deployment"""
440 if K8S_DEPLOYMENT in ctx.instance.runtime_properties:
442 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
443 k8sclient.undeploy(deployment_description)
445 except Exception as e:
446 ctx.logger.error("Unexpected error while deleting k8s deployment: {0}"
449 # A previous install workflow may have failed,
450 # and no Kubernetes deployment info was recorded in runtime_properties.
451 # No need to run the undeploy operation
452 ctx.logger.info("No k8s deployment information, not attempting to delete k8s deployment")
454 @wrap_error_handling_update
457 def scale(replicas, **kwargs):
458 """Change number of replicas in the deployment"""
459 service_component_name = ctx.instance.runtime_properties["service_component_name"]
462 current_replicas = ctx.instance.runtime_properties["replicas"]
463 ctx.logger.info("Scaling {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
464 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
465 k8sclient.scale(deployment_description, replicas)
466 ctx.instance.runtime_properties["replicas"] = replicas
468 # Verify that the scaling took place as expected
469 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
470 ctx.logger.info("Waiting up to {0} secs for {1} to scale and become ready".format(max_wait, service_component_name))
471 if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait):
472 ctx.logger.info("Scaling complete: {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
475 ctx.logger.info("Ignoring request to scale {0} to zero replicas".format(service_component_name))
477 @wrap_error_handling_update
480 def update_image(image, **kwargs):
481 """ Restart component with a new Docker image """
483 service_component_name = ctx.instance.runtime_properties["service_component_name"]
485 current_image = ctx.instance.runtime_properties["image"]
486 ctx.logger.info("Updating app image for {0} from {1} to {2}".format(service_component_name, current_image, image))
487 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
488 k8sclient.upgrade(deployment_description, image)
489 ctx.instance.runtime_properties["image"] = image
491 # Verify that the update took place as expected
492 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
493 ctx.logger.info("Waiting up to {0} secs for {1} to be updated and become ready".format(max_wait, service_component_name))
494 if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait):
495 ctx.logger.info("Update complete: {0} from {1} to {2}".format(service_component_name, current_image, image))
498 ctx.logger.info("Ignoring update_image request for {0} with unusable image '{1}'".format(service_component_name, str(image)))
500 #TODO: implement rollback operation when kubernetes python client fix is available.
501 # (See comments in k8sclient.py.)
502 # In the meantime, it's possible to undo an update_image operation by doing a second
503 # update_image that specifies the older image.
506 @Policies.cleanup_policies_on_node
508 def cleanup_discovery(**kwargs):
509 """Delete configuration from Consul"""
510 if SERVICE_COMPONENT_NAME in ctx.instance.runtime_properties:
511 service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
514 conn = dis.create_kv_conn(CONSUL_HOST)
515 dis.remove_service_component_config(conn, service_component_name)
516 except dis.DiscoveryConnectionError as e:
517 raise RecoverableError(e)
519 # When another node in the blueprint fails install,
520 # this node may not have generated a service component name.
521 # There's nothing to delete from Consul.
522 ctx.logger.info ("No service_component_name, not attempting to delete config from Consul")
524 def _notify_container(**kwargs):
526 Notify container using the policy section in the docker_config.
527 Notification consists of running a script in the application container
528 in each pod in the Kubernetes deployment associated with this node.
529 Return the list of notification results.
531 dc = kwargs["docker_config"]
534 if "policy" in dc and dc["policy"].get("trigger_type") == "docker":
535 # Build the command to execute in the container
536 # SCRIPT_PATH policies {"policies" : ...., "updated_policies" : ..., "removed_policies": ...}
537 script_path = dc["policy"]["script_path"]
539 "policies": kwargs["policies"],
540 "updated_policies": kwargs["updated_policies"],
541 "removed_policies": kwargs["removed_policies"]
544 command = [script_path, "policies", json.dumps(policy_data)]
546 # Execute the command
547 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
548 resp = k8sclient.execute_command_in_deployment(deployment_description, command)
550 # else the default is no trigger
556 @Policies.update_policies_on_node()
557 def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
558 """Policy update task
560 This method is responsible for updating the application configuration and
561 notifying the applications that the change has occurred. This is to be used
562 for the dcae.interfaces.policy.policy_update operation.
564 :updated_policies: contains the list of changed policy-configs when configs_only=True
565 (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
567 service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
568 ctx.logger.info("policy_update for {0}-- updated_policies: {1}, removed_policies: {2}, policies: {3}"
569 .format(service_component_name, updated_policies, removed_policies, policies))
570 update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
571 update_inputs["updated_policies"] = updated_policies
572 update_inputs["removed_policies"] = removed_policies
573 update_inputs["policies"] = policies
575 resp = _notify_container(**update_inputs)
576 ctx.logger.info("policy_update complete for {0}--notification results: {1}".format(service_component_name,json.dumps(resp)))