1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 # Lifecycle interface calls for containerized components
23 # Needed by Cloudify Manager to load google.auth for the Kubernetes python client
24 import cloudify_importer
27 from cloudify import ctx
28 from cloudify.decorators import operation
29 from cloudify.exceptions import NonRecoverableError, RecoverableError
30 from onap_dcae_dcaepolicy_lib import Policies
31 from k8splugin import discovery as dis
32 from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
33 merge_inputs_for_start, merge_inputs_for_create, wrap_error_handling_update
34 from k8splugin.exceptions import DockerPluginDeploymentError
35 from k8splugin import utils
36 from configure import configure
40 plugin_conf = configure.configure()
41 CONSUL_HOST = plugin_conf.get("consul_host")
42 CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
43 DCAE_NAMESPACE = plugin_conf.get("namespace")
45 # Used to construct delivery urls for data router subscribers. Data router in FTL
46 # requires https but this author believes that ONAP is to be defaulted to http.
47 DEFAULT_SCHEME = "http"
50 SERVICE_COMPONENT_NAME = "service_component_name"
51 CONTAINER_ID = "container_id"
52 APPLICATION_CONFIG = "application_config"
58 # Lifecycle interface calls for dcae.nodes.DockerContainer
60 def _setup_for_discovery(**kwargs):
61 """Setup for config discovery"""
64 application_config = kwargs[APPLICATION_CONFIG]
66 # NOTE: application_config is no longer a json string and is inputed as a
67 # YAML map which translates to a dict. We don't have to do any
68 # preprocessing anymore.
69 conn = dis.create_kv_conn(CONSUL_HOST)
70 dis.push_service_component_config(conn, name, application_config)
72 except dis.DiscoveryConnectionError as e:
73 raise RecoverableError(e)
74 except Exception as e:
75 ctx.logger.error("Unexpected error while pushing configuration: {0}"
77 raise NonRecoverableError(e)
79 def _generate_component_name(**kwargs):
80 """Generate component name"""
81 service_component_type = kwargs['service_component_type']
82 name_override = kwargs['service_component_name_override']
84 kwargs['name'] = name_override if name_override \
85 else dis.generate_service_component_name(service_component_type)
88 def _done_for_create(**kwargs):
89 """Wrap up create operation"""
91 kwargs[SERVICE_COMPONENT_NAME] = name
92 # All updates to the runtime_properties happens here. I don't see a reason
93 # why we shouldn't do this because the context is not being mutated by
94 # something else and will keep the other functions pure (pure in the sense
95 # not dealing with CloudifyContext).
96 ctx.instance.runtime_properties.update(kwargs)
97 ctx.logger.info("Done setting up: {0}".format(name))
101 @merge_inputs_for_create
103 @Policies.gather_policies_to_node()
105 def create_for_components(**create_inputs):
106 """Create step for Docker containers that are components
108 This interface is responsible for:
110 1. Generating service component name
111 2. Populating config information into Consul
114 **_setup_for_discovery(
115 **_generate_component_name(
119 def _parse_streams(**kwargs):
120 """Parse streams and setup for DMaaP plugin"""
121 # The DMaaP plugin requires this plugin to set the runtime properties
122 # keyed by the node name.
123 def setup_publishes(s):
124 kwargs[s["name"]] = s
126 map(setup_publishes, kwargs["streams_publishes"])
128 def setup_subscribes(s):
129 if s["type"] == "data_router":
130 # If username and password has been provided then generate it. The
131 # DMaaP plugin doesn't generate for subscribers. The generation code
132 # and length of username password has been lifted from the DMaaP
135 # Don't want to mutate the source
137 if not s.get("username", None):
138 s["username"] = utils.random_string(8)
139 if not s.get("password", None):
140 s["password"] = utils.random_string(10)
142 kwargs[s["name"]] = s
144 # NOTE: That the delivery url is constructed and setup in the start operation
145 map(setup_subscribes, kwargs["streams_subscribes"])
149 def _setup_for_discovery_streams(**kwargs):
150 """Setup for discovery of streams
152 Specifically, there's a race condition this call addresses for data router
153 subscriber case. The component needs its feed subscriber information but the
154 DMaaP plugin doesn't provide this until after the docker plugin start
157 dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
158 if s["type"] == "data_router"]
161 dmaap_kv_key = "{0}:dmaap".format(kwargs["name"])
162 conn = dis.create_kv_conn(CONSUL_HOST)
164 def add_feed(dr_sub):
165 # delivery url and subscriber id will be fill by the dmaap plugin later
166 v = { "location": dr_sub["location"], "delivery_url": None,
167 "username": dr_sub["username"], "password": dr_sub["password"],
168 "subscriber_id": None }
169 return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None
172 if all(map(add_feed, dr_subs)):
174 except Exception as e:
175 raise NonRecoverableError(e)
177 # You should never get here
178 raise NonRecoverableError("Failure updating feed streams in Consul")
183 @merge_inputs_for_create
185 @Policies.gather_policies_to_node()
187 def create_for_components_with_streams(**create_inputs):
188 """Create step for Docker containers that are components that use DMaaP
190 This interface is responsible for:
192 1. Generating service component name
193 2. Setup runtime properties for DMaaP plugin
194 3. Populating application config into Consul
195 4. Populating DMaaP config for data router subscribers in Consul
198 **_setup_for_discovery(
199 **_setup_for_discovery_streams(
201 **_generate_component_name(
205 @merge_inputs_for_create
208 def create_for_platforms(**create_inputs):
209 """Create step for Docker containers that are platform components
211 This interface is responible for:
213 1. Populating config information into Consul
216 **_setup_for_discovery(
220 def _lookup_service(service_component_name, consul_host=CONSUL_HOST,
222 conn = dis.create_kv_conn(consul_host)
223 results = dis.lookup_service(conn, service_component_name)
228 return "{address}:{port}".format(address=result["ServiceAddress"],
229 port=result["ServicePort"])
231 return results[0]["ServiceAddress"]
234 def _verify_container(service_component_name, max_wait):
235 """Verify that the container is healthy
239 max_wait (integer): limit to how may attempts to make which translates to
240 seconds because each sleep is one second. 0 means infinite.
244 True if component is healthy else a DockerPluginDeploymentError exception
250 if k8sclient.is_available(DCAE_NAMESPACE, service_component_name):
255 if max_wait > 0 and max_wait < num_attempts:
256 raise DockerPluginDeploymentError("Container never became healthy")
262 def _create_and_start_container(container_name, image, **kwargs):
264 This will create a k8s Deployment and, if needed, a k8s Service or two.
265 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
266 We're not exposing k8s to the component developer and the blueprint author.
267 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
268 the details from the component developer and the blueprint author.)
271 - volumes: array of volume objects, where a volume object is:
272 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
273 - ports: array of strings in the form "container_port:host_port"
274 - envs: map of name-value pairs ( {name0: value0, name1: value1...} )
275 - always_pull: boolean. If true, sets image pull policy to "Always"
276 so that a fresh copy of the image is always pull. Otherwise, sets
277 image pull policy to "IfNotPresent"
278 - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
279 - log_info: an object with info for setting up ELK logging, with the form:
280 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}"
281 - replicas: number of replicas to be launched initially
282 - readiness: object with information needed to create a readiness check
284 env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
285 "CONFIG_BINDING_SERVICE": "config-binding-service" }
286 env.update(kwargs.get("envs", {}))
287 ctx.logger.info("Deploying {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
288 ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
289 replicas = kwargs.get("replicas", 1)
290 _,dep = k8sclient.deploy(DCAE_NAMESPACE,
294 always_pull=kwargs.get("always_pull_image", False),
295 k8sconfig=plugin_conf,
296 volumes=kwargs.get("volumes",[]),
297 ports=kwargs.get("ports",[]),
298 msb_list=kwargs.get("msb_list"),
300 labels = kwargs.get("labels", {}),
301 log_info=kwargs.get("log_info"),
302 readiness=kwargs.get("readiness"))
304 # Capture the result of deployment for future use
305 ctx.instance.runtime_properties["k8s_deployment"] = dep
306 ctx.instance.runtime_properties["replicas"] = replicas
307 ctx.logger.info ("Deployment complete: {0}".format(dep))
309 def _parse_cloudify_context(**kwargs):
310 """Parse Cloudify context
312 Extract what is needed. This is impure function because it requires ctx.
314 kwargs["deployment_id"] = ctx.deployment.id
316 # Set some labels for the Kubernetes pods
318 "cfydeployment" : ctx.deployment.id,
319 "cfynode": ctx.node.name,
320 "cfynodeinstance": ctx.instance.id
323 # Pick up the centralized logging info
324 if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
325 kwargs["log_info"] = ctx.node.properties["log_info"]
327 # Pick up replica count and always_pull_image flag
328 if "replicas" in ctx.node.properties:
329 kwargs["replicas"] = ctx.node.properties["replicas"]
330 if "always_pull_image" in ctx.node.properties:
331 kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
335 def _enhance_docker_params(**kwargs):
337 Set up Docker environment variables and readiness check info
338 and inject into kwargs.
341 # Get info for setting up readiness probe, if present
342 docker_config = kwargs.get("docker_config", {})
343 if "healthcheck" in docker_config:
344 kwargs["readiness"] = docker_config["healthcheck"]
346 envs = kwargs.get("envs", {})
348 # Set tags on this component for its Consul registration as a service
349 tags = [kwargs.get("deployment_id", None), kwargs["service_id"]]
350 tags = [ str(tag) for tag in tags if tag is not None ]
351 # Registrator will use this to register this component with tags. Must be
353 envs["SERVICE_TAGS"] = ",".join(tags)
355 kwargs["envs"] = envs
357 def combine_params(key, docker_config, kwargs):
358 v = docker_config.get(key, []) + kwargs.get(key, [])
363 # Add the lists of ports and volumes unintelligently - meaning just add the
364 # lists together with no deduping.
365 kwargs = combine_params("ports", docker_config, kwargs)
366 kwargs = combine_params("volumes", docker_config, kwargs)
371 def _create_and_start_component(**kwargs):
372 """Create and start component (container)"""
373 image = kwargs["image"]
374 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
375 # Need to be picky and manually select out pieces because just using kwargs
376 # which contains everything confused the execution of
377 # _create_and_start_container because duplicate variables exist
379 "volumes": kwargs.get("volumes", []),
380 "ports": kwargs.get("ports", None),
381 "envs": kwargs.get("envs", {}),
382 "log_info": kwargs.get("log_info", {}),
383 "labels": kwargs.get("labels", {}),
384 "readiness": kwargs.get("readiness",{})}
385 _create_and_start_container(service_component_name, image, **sub_kwargs)
387 # TODO: Use regular logging here
388 ctx.logger.info("Container started: {0}".format(service_component_name))
392 def _verify_component(**kwargs):
393 """Verify component (container) is healthy"""
394 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
396 max_wait = kwargs.get("max_wait", 300)
398 # Verify that the container is healthy
400 if _verify_container(service_component_name, max_wait):
401 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
403 # TODO: Use regular logging here
404 ctx.logger.info("Container is healthy: {0}".format(service_component_name))
408 def _done_for_start(**kwargs):
409 ctx.instance.runtime_properties.update(kwargs)
410 ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
413 def _setup_msb_registration(service_name, msb_reg):
415 "serviceName" : service_name,
416 "port" : msb_reg.get("port", "80"),
417 "version" : msb_reg.get("version", "v1"),
418 "url" : msb_reg.get("url_path", "/v1"),
420 "enable_ssl" : msb_reg.get("uses_ssl", False),
424 @wrap_error_handling_start
425 @merge_inputs_for_start
428 def create_and_start_container_for_components(**start_inputs):
429 """Create Docker container and start for components
431 This operation method is to be used with the DockerContainerForComponents
432 node type. After launching the container, the plugin will verify with Consul
433 that the app is up and healthy before terminating.
437 **_create_and_start_component(
438 **_enhance_docker_params(
439 **_parse_cloudify_context(**start_inputs)))))
442 def _update_delivery_url(**kwargs):
443 """Update the delivery url for data router subscribers"""
444 dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
445 if s["type"] == "data_router"]
448 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
449 # TODO: Should NOT be setting up the delivery url with ip addresses
450 # because in the https case, this will not work because data router does
451 # a certificate validation using the fqdn.
452 subscriber_host = _lookup_service(service_component_name, with_port=True)
454 for dr_sub in dr_subs:
455 scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME
456 if "route" not in dr_sub:
457 raise NonRecoverableError("'route' key missing from data router subscriber")
458 path = dr_sub["route"]
459 dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format(
460 scheme=scheme, host=subscriber_host, path=path)
461 kwargs[dr_sub["name"]] = dr_sub
465 @wrap_error_handling_start
466 @merge_inputs_for_start
469 def create_and_start_container_for_components_with_streams(**start_inputs):
470 """Create Docker container and start for components that have streams
472 This operation method is to be used with the DockerContainerForComponents
473 node type. After launching the container, the plugin will verify with Consul
474 that the app is up and healthy before terminating.
477 **_update_delivery_url(
479 **_create_and_start_component(
480 **_enhance_docker_params(
481 **_parse_cloudify_context(**start_inputs))))))
484 @wrap_error_handling_start
487 def create_and_start_container_for_platforms(**kwargs):
488 """Create Docker container and start for platform services
490 This operation method is to be used with the ContainerizedPlatformComponent
493 # Capture node properties
494 image = ctx.node.properties["image"]
495 docker_config = ctx.node.properties.get("docker_config", {})
496 if "healthcheck" in docker_config:
497 kwargs["readiness"] = docker_config["healthcheck"]
498 if "dns_name" in ctx.node.properties:
499 service_component_name = ctx.node.properties["dns_name"]
501 service_component_name = ctx.node.properties["name"]
503 # Set some labels for the Kubernetes pods
505 "cfydeployment" : ctx.deployment.id,
506 "cfynode": ctx.node.name,
507 "cfynodeinstance": ctx.instance.id
510 host_port = ctx.node.properties["host_port"]
511 container_port = ctx.node.properties["container_port"]
513 # Cloudify properties are all required and Cloudify complains that None
514 # is not a valid type for integer. Defaulting to 0 to indicate to not
515 # use this and not to set a specific port mapping in cases like service
517 if container_port != 0:
518 # Doing this because other nodes might want to use this property
519 port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port)
520 ports = kwargs.get("ports", []) + [ port_mapping ]
521 kwargs["ports"] = ports
522 if "ports" not in kwargs:
523 ctx.logger.warn("No port mappings defined. Will randomly assign port.")
525 # All of the new node properties could be handled more DRYly!
526 # If a registration to MSB is required, then set up the registration info
527 if "msb_registration" in ctx.node.properties and "port" in ctx.node.properties["msb_registration"]:
528 kwargs["msb_list"] = [_setup_msb_registration(service_component_name, ctx.node.properties["msb_registration"])]
530 # If centralized logging via ELK is desired, then set up the logging info
531 if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
532 kwargs["log_info"] = ctx.node.properties["log_info"]
534 # Pick up replica count and always_pull_image flag
535 if "replicas" in ctx.node.properties:
536 kwargs["replicas"] = ctx.node.properties["replicas"]
537 if "always_pull_image" in ctx.node.properties:
538 kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
539 _create_and_start_container(service_component_name, image, **kwargs)
541 ctx.logger.info("Container started: {0}".format(service_component_name))
543 # Verify that the container is healthy
545 max_wait = kwargs.get("max_wait", 300)
547 if _verify_container(service_component_name, max_wait):
548 ctx.logger.info("Container is healthy: {0}".format(service_component_name))
551 @wrap_error_handling_start
554 def create_and_start_container(**kwargs):
555 """Create Docker container and start"""
556 service_component_name = ctx.node.properties["name"]
557 ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
559 image = ctx.node.properties["image"]
561 _create_and_start_container(service_component_name, image,**kwargs)
563 ctx.logger.info("Component deployed: {0}".format(service_component_name))
568 def stop_and_remove_container(**kwargs):
569 """Stop and remove Docker container"""
571 deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
572 k8sclient.undeploy(deployment_description)
574 except Exception as e:
575 ctx.logger.error("Unexpected error while stopping container: {0}"
578 @wrap_error_handling_update
581 def scale(replicas, **kwargs):
582 """Change number of replicas in the deployment"""
584 current_replicas = ctx.instance.runtime_properties["replicas"]
585 ctx.logger.info("Scaling from {0} to {1}".format(current_replicas, replicas))
586 deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
587 k8sclient.scale(deployment_description, replicas)
588 ctx.instance.runtime_properties["replicas"] = replicas
590 # Verify that the scaling took place as expected
591 max_wait = kwargs.get("max_wait", 300)
592 service_component_name = ctx.instance.runtime_properties["service_component_name"]
593 if _verify_container(service_component_name, max_wait):
594 ctx.logger.info("Scaling complete : {0} from {1} to {2} instance(s)".format(service_component_name, current_replicas, replicas))
597 ctx.logger.info("Ignoring request to scale to zero replicas")
599 @wrap_error_handling_update
602 def update_image(image, **kwargs):
604 current_image = ctx.instance.runtime_properties["image"]
605 ctx.logger.info("Updating application container image from {0} to {1}".format(current_image, image))
606 deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
607 k8sclient.upgrade(deployment_description, image)
608 ctx.instance.runtime_properties["image"] = image
610 # Verify that the update took place as expected
611 max_wait = kwargs.get("max_wait", 300)
612 service_component_name = ctx.instance.runtime_properties["service_component_name"]
613 if _verify_container(service_component_name, max_wait):
614 ctx.logger.info("Update complete : {0} from {1} to {2} instance(s)".format(service_component_name, current_image, image))
617 ctx.logger.info("Ignoring update_image request with unusable image '{0}'".format(str(image)))
619 #TODO: implement rollback operation when kubernetes python client fix is available.
620 # (See comments in k8sclient.py.)
621 # In the meantime, it's possible to undo an update_image operation by doing a second
622 # update_image that specifies the older image.
625 @Policies.cleanup_policies_on_node
627 def cleanup_discovery(**kwargs):
628 """Delete configuration from Consul"""
629 service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
632 conn = dis.create_kv_conn(CONSUL_HOST)
633 dis.remove_service_component_config(conn, service_component_name)
634 except dis.DiscoveryConnectionError as e:
635 raise RecoverableError(e)
638 def _notify_container(**kwargs):
639 """Notify container using the policy section in the docker_config"""
640 dc = kwargs["docker_config"]
643 if dc["policy"]["trigger_type"] == "docker":
646 Need replacement for this in kubernetes.
647 Need to find all the pods that have been deployed
648 and execute the script in them.
649 Kubernetes does not appear to have a way to ask for a script
650 to be executed in all of the currently running pods for a
651 Kubernetes Deployment or ReplicaSet. We will have to find
652 each of them and run the script. The problem is that set of
653 pods could be changing. We can query to get all the pods, but
654 there's no guarantee the list won't change while we're trying to
657 In ONAP R2, all of the policy-driven components rely on polling.
660 # REVIEW: Need to finalize on the docker config policy data structure
661 script_path = dc["policy"]["script_path"]
662 updated_policies = kwargs["updated_policies"]
663 removed_policies = kwargs["removed_policies"]
664 policies = kwargs["policies"]
665 cmd = doc.build_policy_update_cmd(script_path, use_sh=False,
667 updated_policies=updated_policies,
668 removed_policies=removed_policies,
672 docker_host = kwargs[SELECTED_CONTAINER_DESTINATION]
673 docker_host_ip = _lookup_service(docker_host)
674 logins = _get_docker_logins()
675 client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins)
677 container_id = kwargs["container_id"]
679 doc.notify_for_policy_update(client, container_id, cmd)
681 # else the default is no trigger
687 @Policies.update_policies_on_node()
689 def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
690 """Policy update task
692 This method is responsible for updating the application configuration and
693 notifying the applications that the change has occurred. This is to be used
694 for the dcae.interfaces.policy.policy_update operation.
696 :updated_policies: contains the list of changed policy-configs when configs_only=True
697 (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
699 update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
700 update_inputs["updated_policies"] = updated_policies
701 update_inputs["removed_policies"] = removed_policies
702 update_inputs["policies"] = policies
704 _notify_container(**update_inputs)