1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 # Lifecycle interface calls for containerized components
23 # Needed by Cloudify Manager to load google.auth for the Kubernetes python client
24 import cloudify_importer
27 from cloudify import ctx
28 from cloudify.decorators import operation
29 from cloudify.exceptions import NonRecoverableError, RecoverableError
30 from onap_dcae_dcaepolicy_lib import Policies
31 from k8splugin import discovery as dis
32 from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
33 merge_inputs_for_start, merge_inputs_for_create
34 from k8splugin.exceptions import DockerPluginDeploymentError
35 from k8splugin import utils
36 from configure import configure
37 from k8sclient import k8sclient
40 plugin_conf = configure.configure()
41 CONSUL_HOST = plugin_conf.get("consul_host")
42 CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
43 DCAE_NAMESPACE = plugin_conf.get("namespace")
45 # Used to construct delivery urls for data router subscribers. Data router in FTL
46 # requires https but this author believes that ONAP is to be defaulted to http.
47 DEFAULT_SCHEME = "http"
50 SERVICE_COMPONENT_NAME = "service_component_name"
51 CONTAINER_ID = "container_id"
52 APPLICATION_CONFIG = "application_config"
58 # Lifecycle interface calls for dcae.nodes.DockerContainer
60 def _setup_for_discovery(**kwargs):
61 """Setup for config discovery"""
64 application_config = kwargs[APPLICATION_CONFIG]
66 # NOTE: application_config is no longer a json string and is inputed as a
67 # YAML map which translates to a dict. We don't have to do any
68 # preprocessing anymore.
69 conn = dis.create_kv_conn(CONSUL_HOST)
70 dis.push_service_component_config(conn, name, application_config)
72 except dis.DiscoveryConnectionError as e:
73 raise RecoverableError(e)
74 except Exception as e:
75 ctx.logger.error("Unexpected error while pushing configuration: {0}"
77 raise NonRecoverableError(e)
79 def _generate_component_name(**kwargs):
80 """Generate component name"""
81 service_component_type = kwargs['service_component_type']
82 name_override = kwargs['service_component_name_override']
84 kwargs['name'] = name_override if name_override \
85 else dis.generate_service_component_name(service_component_type)
88 def _done_for_create(**kwargs):
89 """Wrap up create operation"""
91 kwargs[SERVICE_COMPONENT_NAME] = name
92 # All updates to the runtime_properties happens here. I don't see a reason
93 # why we shouldn't do this because the context is not being mutated by
94 # something else and will keep the other functions pure (pure in the sense
95 # not dealing with CloudifyContext).
96 ctx.instance.runtime_properties.update(kwargs)
97 ctx.logger.info("Done setting up: {0}".format(name))
101 @merge_inputs_for_create
103 @Policies.gather_policies_to_node()
105 def create_for_components(**create_inputs):
106 """Create step for Docker containers that are components
108 This interface is responsible for:
110 1. Generating service component name
111 2. Populating config information into Consul
114 **_setup_for_discovery(
115 **_generate_component_name(
119 def _parse_streams(**kwargs):
120 """Parse streams and setup for DMaaP plugin"""
121 # The DMaaP plugin requires this plugin to set the runtime properties
122 # keyed by the node name.
123 def setup_publishes(s):
124 kwargs[s["name"]] = s
126 map(setup_publishes, kwargs["streams_publishes"])
128 def setup_subscribes(s):
129 if s["type"] == "data_router":
130 # If username and password has been provided then generate it. The
131 # DMaaP plugin doesn't generate for subscribers. The generation code
132 # and length of username password has been lifted from the DMaaP
135 # Don't want to mutate the source
137 if not s.get("username", None):
138 s["username"] = utils.random_string(8)
139 if not s.get("password", None):
140 s["password"] = utils.random_string(10)
142 kwargs[s["name"]] = s
144 # NOTE: That the delivery url is constructed and setup in the start operation
145 map(setup_subscribes, kwargs["streams_subscribes"])
149 def _setup_for_discovery_streams(**kwargs):
150 """Setup for discovery of streams
152 Specifically, there's a race condition this call addresses for data router
153 subscriber case. The component needs its feed subscriber information but the
154 DMaaP plugin doesn't provide this until after the docker plugin start
157 dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
158 if s["type"] == "data_router"]
161 dmaap_kv_key = "{0}:dmaap".format(kwargs["name"])
162 conn = dis.create_kv_conn(CONSUL_HOST)
164 def add_feed(dr_sub):
165 # delivery url and subscriber id will be fill by the dmaap plugin later
166 v = { "location": dr_sub["location"], "delivery_url": None,
167 "username": dr_sub["username"], "password": dr_sub["password"],
168 "subscriber_id": None }
169 return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None
172 if all(map(add_feed, dr_subs)):
174 except Exception as e:
175 raise NonRecoverableError(e)
177 # You should never get here
178 raise NonRecoverableError("Failure updating feed streams in Consul")
183 @merge_inputs_for_create
185 @Policies.gather_policies_to_node()
187 def create_for_components_with_streams(**create_inputs):
188 """Create step for Docker containers that are components that use DMaaP
190 This interface is responsible for:
192 1. Generating service component name
193 2. Setup runtime properties for DMaaP plugin
194 3. Populating application config into Consul
195 4. Populating DMaaP config for data router subscribers in Consul
198 **_setup_for_discovery(
199 **_setup_for_discovery_streams(
201 **_generate_component_name(
205 @merge_inputs_for_create
208 def create_for_platforms(**create_inputs):
209 """Create step for Docker containers that are platform components
211 This interface is responible for:
213 1. Populating config information into Consul
216 **_setup_for_discovery(
220 def _lookup_service(service_component_name, consul_host=CONSUL_HOST,
222 conn = dis.create_kv_conn(consul_host)
223 results = dis.lookup_service(conn, service_component_name)
228 return "{address}:{port}".format(address=result["ServiceAddress"],
229 port=result["ServicePort"])
231 return results[0]["ServiceAddress"]
234 def _verify_container(service_component_name, max_wait):
235 """Verify that the container is healthy
239 max_wait (integer): limit to how may attempts to make which translates to
240 seconds because each sleep is one second. 0 means infinite.
244 True if component is healthy else a DockerPluginDeploymentError exception
250 if k8sclient.is_available(DCAE_NAMESPACE, service_component_name):
255 if max_wait > 0 and max_wait < num_attempts:
256 raise DockerPluginDeploymentError("Container never became healthy")
262 def _create_and_start_container(container_name, image, **kwargs):
264 This will create a k8s Deployment and, if needed, a k8s Service or two.
265 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
266 We're not exposing k8s to the component developer and the blueprint author.
267 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
268 the details from the component developer and the blueprint author.)
271 - volumes: array of volume objects, where a volume object is:
272 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
273 - ports: array of strings in the form "container_port:host_port"
274 - envs: map of name-value pairs ( {name0: value0, name1: value1...} )
275 - always_pull: boolean. If true, sets image pull policy to "Always"
276 so that a fresh copy of the image is always pull. Otherwise, sets
277 image pull policy to "IfNotPresent"
278 - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
279 - log_info: an object with info for setting up ELK logging, with the form:
280 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}"
281 - replicas: number of replicas to be launched initially
282 - readiness: object with information needed to create a readiness check
284 env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
285 "CONFIG_BINDING_SERVICE": "config-binding-service" }
286 env.update(kwargs.get("envs", {}))
287 ctx.logger.info("Deploying {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
288 ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
289 replicas = kwargs.get("replicas", 1)
290 _,dep = k8sclient.deploy(DCAE_NAMESPACE,
294 always_pull=kwargs.get("always_pull_image", False),
295 k8sconfig=plugin_conf,
296 volumes=kwargs.get("volumes",[]),
297 ports=kwargs.get("ports",[]),
298 msb_list=kwargs.get("msb_list"),
300 labels = kwargs.get("labels", {}),
301 log_info=kwargs.get("log_info"),
302 readiness=kwargs.get("readiness"))
304 # Capture the result of deployment for future use
305 ctx.instance.runtime_properties["k8s_deployment"] = dep
306 ctx.instance.runtime_properties["replicas"] = replicas
307 ctx.logger.info ("Deployment complete: {0}".format(dep))
309 def _parse_cloudify_context(**kwargs):
310 """Parse Cloudify context
312 Extract what is needed. This is impure function because it requires ctx.
314 kwargs["deployment_id"] = ctx.deployment.id
316 # Set some labels for the Kubernetes pods
318 "cfydeployment" : ctx.deployment.id,
319 "cfynode": ctx.node.name,
320 "cfynodeinstance": ctx.instance.id
323 # Pick up the centralized logging info
324 if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
325 kwargs["log_info"] = ctx.node.properties["log_info"]
327 # Pick up replica count and always_pull_image flag
328 if "replicas" in ctx.node.properties:
329 kwargs["replicas"] = ctx.node.properties["replicas"]
330 if "always_pull_image" in ctx.node.properties:
331 kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
335 def _enhance_docker_params(**kwargs):
337 Set up Docker environment variables and readiness check info
338 and inject into kwargs.
341 # Get info for setting up readiness probe, if present
342 docker_config = kwargs.get("docker_config", {})
343 if "healthcheck" in docker_config:
344 kwargs["readiness"] = docker_config["healthcheck"]
346 envs = kwargs.get("envs", {})
348 # Set tags on this component for its Consul registration as a service
349 tags = [kwargs.get("deployment_id", None), kwargs["service_id"]]
350 tags = [ str(tag) for tag in tags if tag is not None ]
351 # Registrator will use this to register this component with tags. Must be
353 envs["SERVICE_TAGS"] = ",".join(tags)
355 kwargs["envs"] = envs
357 def combine_params(key, docker_config, kwargs):
358 v = docker_config.get(key, []) + kwargs.get(key, [])
363 # Add the lists of ports and volumes unintelligently - meaning just add the
364 # lists together with no deduping.
365 kwargs = combine_params("ports", docker_config, kwargs)
366 kwargs = combine_params("volumes", docker_config, kwargs)
371 def _create_and_start_component(**kwargs):
372 """Create and start component (container)"""
373 image = kwargs["image"]
374 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
375 # Need to be picky and manually select out pieces because just using kwargs
376 # which contains everything confused the execution of
377 # _create_and_start_container because duplicate variables exist
379 "volumes": kwargs.get("volumes", []),
380 "ports": kwargs.get("ports", None),
381 "envs": kwargs.get("envs", {}),
382 "log_info": kwargs.get("log_info", {}),
383 "labels": kwargs.get("labels", {}),
384 "readiness": kwargs.get("readiness",{})}
385 _create_and_start_container(service_component_name, image, **sub_kwargs)
387 # TODO: Use regular logging here
388 ctx.logger.info("Container started: {0}".format(service_component_name))
392 def _verify_component(**kwargs):
393 """Verify component (container) is healthy"""
394 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
395 # TODO: "Consul doesn't make its first health check immediately upon registration.
396 # Instead it waits for the health check interval to pass."
397 # Possible enhancement is to read the interval (and possibly the timeout) from
398 # docker_config and multiply that by a number to come up with a more suitable
401 max_wait = kwargs.get("max_wait", 300)
403 # Verify that the container is healthy
405 if _verify_container(service_component_name, max_wait):
406 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
408 # TODO: Use regular logging here
409 ctx.logger.info("Container is healthy: {0}".format(service_component_name))
413 def _done_for_start(**kwargs):
414 ctx.instance.runtime_properties.update(kwargs)
415 ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
418 def _setup_msb_registration(service_name, msb_reg):
420 "serviceName" : service_name,
421 "port" : msb_reg.get("port", "80"),
422 "version" : msb_reg.get("version", "v1"),
423 "url" : msb_reg.get("url_path", "/v1"),
425 "enable_ssl" : msb_reg.get("uses_ssl", False),
429 @wrap_error_handling_start
430 @merge_inputs_for_start
433 def create_and_start_container_for_components(**start_inputs):
434 """Create Docker container and start for components
436 This operation method is to be used with the DockerContainerForComponents
437 node type. After launching the container, the plugin will verify with Consul
438 that the app is up and healthy before terminating.
442 **_create_and_start_component(
443 **_enhance_docker_params(
444 **_parse_cloudify_context(**start_inputs)))))
447 def _update_delivery_url(**kwargs):
448 """Update the delivery url for data router subscribers"""
449 dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
450 if s["type"] == "data_router"]
453 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
454 # TODO: Should NOT be setting up the delivery url with ip addresses
455 # because in the https case, this will not work because data router does
456 # a certificate validation using the fqdn.
457 subscriber_host = _lookup_service(service_component_name, with_port=True)
459 for dr_sub in dr_subs:
460 scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME
461 if "route" not in dr_sub:
462 raise NonRecoverableError("'route' key missing from data router subscriber")
463 path = dr_sub["route"]
464 dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format(
465 scheme=scheme, host=subscriber_host, path=path)
466 kwargs[dr_sub["name"]] = dr_sub
470 @wrap_error_handling_start
471 @merge_inputs_for_start
474 def create_and_start_container_for_components_with_streams(**start_inputs):
475 """Create Docker container and start for components that have streams
477 This operation method is to be used with the DockerContainerForComponents
478 node type. After launching the container, the plugin will verify with Consul
479 that the app is up and healthy before terminating.
482 **_update_delivery_url(
484 **_create_and_start_component(
485 **_enhance_docker_params(
486 **_parse_cloudify_context(**start_inputs))))))
489 @wrap_error_handling_start
492 def create_and_start_container_for_platforms(**kwargs):
493 """Create Docker container and start for platform services
495 This operation method is to be used with the ContainerizedPlatformComponent
498 # Capture node properties
499 image = ctx.node.properties["image"]
500 docker_config = ctx.node.properties.get("docker_config", {})
501 if "healthcheck" in docker_config:
502 kwargs["readiness"] = docker_config["healthcheck"]
503 if "dns_name" in ctx.node.properties:
504 service_component_name = ctx.node.properties["dns_name"]
506 service_component_name = ctx.node.properties["name"]
508 # Set some labels for the Kubernetes pods
510 "cfydeployment" : ctx.deployment.id,
511 "cfynode": ctx.node.name,
512 "cfynodeinstance": ctx.instance.id
515 host_port = ctx.node.properties["host_port"]
516 container_port = ctx.node.properties["container_port"]
518 # Cloudify properties are all required and Cloudify complains that None
519 # is not a valid type for integer. Defaulting to 0 to indicate to not
520 # use this and not to set a specific port mapping in cases like service
522 if container_port != 0:
523 # Doing this because other nodes might want to use this property
524 port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port)
525 ports = kwargs.get("ports", []) + [ port_mapping ]
526 kwargs["ports"] = ports
527 if "ports" not in kwargs:
528 ctx.logger.warn("No port mappings defined. Will randomly assign port.")
530 # All of the new node properties could be handled more DRYly!
531 # If a registration to MSB is required, then set up the registration info
532 if "msb_registration" in ctx.node.properties and "port" in ctx.node.properties["msb_registration"]:
533 kwargs["msb_list"] = [_setup_msb_registration(service_component_name, ctx.node.properties["msb_registration"])]
535 # If centralized logging via ELK is desired, then set up the logging info
536 if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
537 kwargs["log_info"] = ctx.node.properties["log_info"]
539 # Pick up replica count and always_pull_image flag
540 if "replicas" in ctx.node.properties:
541 kwargs["replicas"] = ctx.node.properties["replicas"]
542 if "always_pull_image" in ctx.node.properties:
543 kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
544 _create_and_start_container(service_component_name, image, **kwargs)
546 ctx.logger.info("Container started: {0}".format(service_component_name))
548 # Verify that the container is healthy
550 max_wait = kwargs.get("max_wait", 300)
552 if _verify_container(service_component_name, max_wait):
553 ctx.logger.info("Container is healthy: {0}".format(service_component_name))
556 @wrap_error_handling_start
559 def create_and_start_container(**kwargs):
560 """Create Docker container and start"""
561 service_component_name = ctx.node.properties["name"]
562 ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
564 image = ctx.node.properties["image"]
566 _create_and_start_container(service_component_name, image,**kwargs)
568 ctx.logger.info("Component deployed: {0}".format(service_component_name))
573 def stop_and_remove_container(**kwargs):
574 """Stop and remove Docker container"""
576 deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
577 k8sclient.undeploy(deployment_description)
579 except Exception as e:
580 ctx.logger.error("Unexpected error while stopping container: {0}"
585 def scale(replicas, **kwargs):
586 """Change number of replicas in the deployment"""
588 current_replicas = ctx.instance.runtime_properties["replicas"]
589 ctx.logger.info("Scaling from {0} to {1}".format(current_replicas, replicas))
591 deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
592 k8sclient.scale(deployment_description, replicas)
593 ctx.instance.runtime_properties["replicas"] = replicas
594 except Exception as e:
595 ctx.logger.error ("Unexpected error while scaling {0}".format(str(e)))
597 ctx.logger.info("Ignoring request to scale to zero replicas")
600 @Policies.cleanup_policies_on_node
602 def cleanup_discovery(**kwargs):
603 """Delete configuration from Consul"""
604 service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
607 conn = dis.create_kv_conn(CONSUL_HOST)
608 dis.remove_service_component_config(conn, service_component_name)
609 except dis.DiscoveryConnectionError as e:
610 raise RecoverableError(e)
613 def _notify_container(**kwargs):
614 """Notify container using the policy section in the docker_config"""
615 dc = kwargs["docker_config"]
618 if dc["policy"]["trigger_type"] == "docker":
621 Need replacement for this in kubernetes.
622 Need to find all the pods that have been deployed
623 and execute the script in them.
624 Kubernetes does not appear to have a way to ask for a script
625 to be executed in all of the currently running pods for a
626 Kubernetes Deployment or ReplicaSet. We will have to find
627 each of them and run the script. The problem is that set of
628 pods could be changing. We can query to get all the pods, but
629 there's no guarantee the list won't change while we're trying to
632 In ONAP R2, all of the policy-driven components rely on polling.
635 # REVIEW: Need to finalize on the docker config policy data structure
636 script_path = dc["policy"]["script_path"]
637 updated_policies = kwargs["updated_policies"]
638 removed_policies = kwargs["removed_policies"]
639 policies = kwargs["policies"]
640 cmd = doc.build_policy_update_cmd(script_path, use_sh=False,
642 updated_policies=updated_policies,
643 removed_policies=removed_policies,
647 docker_host = kwargs[SELECTED_CONTAINER_DESTINATION]
648 docker_host_ip = _lookup_service(docker_host)
649 logins = _get_docker_logins()
650 client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins)
652 container_id = kwargs["container_id"]
654 doc.notify_for_policy_update(client, container_id, cmd)
656 # else the default is no trigger
662 @Policies.update_policies_on_node()
664 def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
665 """Policy update task
667 This method is responsible for updating the application configuration and
668 notifying the applications that the change has occurred. This is to be used
669 for the dcae.interfaces.policy.policy_update operation.
671 :updated_policies: contains the list of changed policy-configs when configs_only=True
672 (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
674 update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
675 update_inputs["updated_policies"] = updated_policies
676 update_inputs["removed_policies"] = removed_policies
677 update_inputs["policies"] = policies
679 _notify_container(**update_inputs)