1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 # Lifecycle interface calls for containerized components
23 # Needed by Cloudify Manager to load google.auth for the Kubernetes python client
24 import cloudify_importer
27 from cloudify import ctx
28 from cloudify.decorators import operation
29 from cloudify.exceptions import NonRecoverableError, RecoverableError
30 import dockering as doc
31 from onap_dcae_dcaepolicy_lib import Policies
32 from k8splugin import discovery as dis
33 from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
34 merge_inputs_for_start, merge_inputs_for_create
35 from k8splugin.exceptions import DockerPluginDeploymentError
36 from k8splugin import utils
37 from configure import configure
38 from k8sclient import k8sclient
41 plugin_conf = configure.configure()
42 CONSUL_HOST = plugin_conf.get("consul_host")
43 CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
44 DCAE_NAMESPACE = plugin_conf.get("namespace")
46 # Used to construct delivery urls for data router subscribers. Data router in FTL
47 # requires https but this author believes that ONAP is to be defaulted to http.
48 DEFAULT_SCHEME = "http"
51 SERVICE_COMPONENT_NAME = "service_component_name"
52 CONTAINER_ID = "container_id"
53 APPLICATION_CONFIG = "application_config"
59 # Lifecycle interface calls for dcae.nodes.DockerContainer
61 def _setup_for_discovery(**kwargs):
62 """Setup for config discovery"""
65 application_config = kwargs[APPLICATION_CONFIG]
67 # NOTE: application_config is no longer a json string and is inputed as a
68 # YAML map which translates to a dict. We don't have to do any
69 # preprocessing anymore.
70 conn = dis.create_kv_conn(CONSUL_HOST)
71 dis.push_service_component_config(conn, name, application_config)
73 except dis.DiscoveryConnectionError as e:
74 raise RecoverableError(e)
75 except Exception as e:
76 ctx.logger.error("Unexpected error while pushing configuration: {0}"
78 raise NonRecoverableError(e)
80 def _generate_component_name(**kwargs):
81 """Generate component name"""
82 service_component_type = kwargs['service_component_type']
83 name_override = kwargs['service_component_name_override']
85 kwargs['name'] = name_override if name_override \
86 else dis.generate_service_component_name(service_component_type)
89 def _done_for_create(**kwargs):
90 """Wrap up create operation"""
92 kwargs[SERVICE_COMPONENT_NAME] = name
93 # All updates to the runtime_properties happens here. I don't see a reason
94 # why we shouldn't do this because the context is not being mutated by
95 # something else and will keep the other functions pure (pure in the sense
96 # not dealing with CloudifyContext).
97 ctx.instance.runtime_properties.update(kwargs)
98 ctx.logger.info("Done setting up: {0}".format(name))
102 @merge_inputs_for_create
104 @Policies.gather_policies_to_node()
106 def create_for_components(**create_inputs):
107 """Create step for Docker containers that are components
109 This interface is responsible for:
111 1. Generating service component name
112 2. Populating config information into Consul
115 **_setup_for_discovery(
116 **_generate_component_name(
120 def _parse_streams(**kwargs):
121 """Parse streams and setup for DMaaP plugin"""
122 # The DMaaP plugin requires this plugin to set the runtime properties
123 # keyed by the node name.
124 def setup_publishes(s):
125 kwargs[s["name"]] = s
127 map(setup_publishes, kwargs["streams_publishes"])
129 def setup_subscribes(s):
130 if s["type"] == "data_router":
131 # If username and password has been provided then generate it. The
132 # DMaaP plugin doesn't generate for subscribers. The generation code
133 # and length of username password has been lifted from the DMaaP
136 # Don't want to mutate the source
138 if not s.get("username", None):
139 s["username"] = utils.random_string(8)
140 if not s.get("password", None):
141 s["password"] = utils.random_string(10)
143 kwargs[s["name"]] = s
145 # NOTE: That the delivery url is constructed and setup in the start operation
146 map(setup_subscribes, kwargs["streams_subscribes"])
150 def _setup_for_discovery_streams(**kwargs):
151 """Setup for discovery of streams
153 Specifically, there's a race condition this call addresses for data router
154 subscriber case. The component needs its feed subscriber information but the
155 DMaaP plugin doesn't provide this until after the docker plugin start
158 dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
159 if s["type"] == "data_router"]
162 dmaap_kv_key = "{0}:dmaap".format(kwargs["name"])
163 conn = dis.create_kv_conn(CONSUL_HOST)
165 def add_feed(dr_sub):
166 # delivery url and subscriber id will be fill by the dmaap plugin later
167 v = { "location": dr_sub["location"], "delivery_url": None,
168 "username": dr_sub["username"], "password": dr_sub["password"],
169 "subscriber_id": None }
170 return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None
173 if all(map(add_feed, dr_subs)):
175 except Exception as e:
176 raise NonRecoverableError(e)
178 # You should never get here
179 raise NonRecoverableError("Failure updating feed streams in Consul")
184 @merge_inputs_for_create
186 @Policies.gather_policies_to_node()
188 def create_for_components_with_streams(**create_inputs):
189 """Create step for Docker containers that are components that use DMaaP
191 This interface is responsible for:
193 1. Generating service component name
194 2. Setup runtime properties for DMaaP plugin
195 3. Populating application config into Consul
196 4. Populating DMaaP config for data router subscribers in Consul
199 **_setup_for_discovery(
200 **_setup_for_discovery_streams(
202 **_generate_component_name(
206 @merge_inputs_for_create
209 def create_for_platforms(**create_inputs):
210 """Create step for Docker containers that are platform components
212 This interface is responible for:
214 1. Populating config information into Consul
217 **_setup_for_discovery(
221 def _lookup_service(service_component_name, consul_host=CONSUL_HOST,
223 conn = dis.create_kv_conn(consul_host)
224 results = dis.lookup_service(conn, service_component_name)
229 return "{address}:{port}".format(address=result["ServiceAddress"],
230 port=result["ServicePort"])
232 return results[0]["ServiceAddress"]
235 def _verify_container(service_component_name, max_wait):
236 """Verify that the container is healthy
240 max_wait (integer): limit to how may attempts to make which translates to
241 seconds because each sleep is one second. 0 means infinite.
245 True if component is healthy else a DockerPluginDeploymentError exception
251 if k8sclient.is_available(DCAE_NAMESPACE, service_component_name):
256 if max_wait > 0 and max_wait < num_attempts:
257 raise DockerPluginDeploymentError("Container never became healthy")
263 def _create_and_start_container(container_name, image, **kwargs):
265 This will create a k8s Deployment and, if needed, a k8s Service or two.
266 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
267 We're not exposing k8s to the component developer and the blueprint author.
268 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
269 the details from the component developer and the blueprint author.)
272 - volumes: array of volume objects, where a volume object is:
273 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
274 - ports: array of strings in the form "container_port:host_port"
275 - envs: map of name-value pairs ( {name0: value0, name1: value1...} )
276 - always_pull: boolean. If true, sets image pull policy to "Always"
277 so that a fresh copy of the image is always pull. Otherwise, sets
278 image pull policy to "IfNotPresent"
279 - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
280 - log_info: an object with info for setting up ELK logging, with the form:
281 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}"
282 - replicas: number of replicas to be launched initially
284 env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
285 "CONFIG_BINDING_SERVICE": "config-binding-service" }
286 env.update(kwargs.get("envs", {}))
287 ctx.logger.info("Deploying {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
288 ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
289 replicas = kwargs.get("replicas", 1)
290 _,dep = k8sclient.deploy(DCAE_NAMESPACE,
294 always_pull=kwargs.get("always_pull_image", False),
295 k8sconfig=plugin_conf,
296 volumes=kwargs.get("volumes",[]),
297 ports=kwargs.get("ports",[]),
298 msb_list=kwargs.get("msb_list"),
300 labels = kwargs.get("labels", {}),
301 log_info=kwargs.get("log_info"))
303 # Capture the result of deployment for future use
304 ctx.instance.runtime_properties["k8s_deployment"] = dep
305 ctx.instance.runtime_properties["replicas"] = replicas
306 ctx.logger.info ("Deployment complete: {0}".format(dep))
308 def _parse_cloudify_context(**kwargs):
309 """Parse Cloudify context
311 Extract what is needed. This is impure function because it requires ctx.
313 kwargs["deployment_id"] = ctx.deployment.id
315 # Set some labels for the Kubernetes pods
317 "cfydeployment" : ctx.deployment.id,
318 "cfynode": ctx.node.name,
319 "cfynodeinstance": ctx.instance.id
322 # Pick up the centralized logging info
323 if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
324 kwargs["log_info"] = ctx.node.properties["log_info"]
326 # Pick up replica count and always_pull_image flag
327 if "replicas" in ctx.node.properties:
328 kwargs["replicas"] = ctx.node.properties["replicas"]
329 if "always_pull_image" in ctx.node.properties:
330 kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
334 def _enhance_docker_params(**kwargs):
335 """Setup Docker envs"""
336 docker_config = kwargs.get("docker_config", {})
338 envs = kwargs.get("envs", {})
339 # NOTE: Healthchecks are optional until prepared to handle use cases that
340 # don't necessarily use http
341 envs_healthcheck = doc.create_envs_healthcheck(docker_config) \
342 if "healthcheck" in docker_config else {}
343 envs.update(envs_healthcheck)
345 # Set tags on this component for its Consul registration as a service
346 tags = [kwargs.get("deployment_id", None), kwargs["service_id"]]
347 tags = [ str(tag) for tag in tags if tag is not None ]
348 # Registrator will use this to register this component with tags. Must be
350 envs["SERVICE_TAGS"] = ",".join(tags)
352 kwargs["envs"] = envs
354 def combine_params(key, docker_config, kwargs):
355 v = docker_config.get(key, []) + kwargs.get(key, [])
360 # Add the lists of ports and volumes unintelligently - meaning just add the
361 # lists together with no deduping.
362 kwargs = combine_params("ports", docker_config, kwargs)
363 kwargs = combine_params("volumes", docker_config, kwargs)
368 def _create_and_start_component(**kwargs):
369 """Create and start component (container)"""
370 image = kwargs["image"]
371 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
372 # Need to be picky and manually select out pieces because just using kwargs
373 # which contains everything confused the execution of
374 # _create_and_start_container because duplicate variables exist
376 "volumes": kwargs.get("volumes", []),
377 "ports": kwargs.get("ports", None),
378 "envs": kwargs.get("envs", {}),
379 "log_info": kwargs.get("log_info", {}),
380 "labels": kwargs.get("labels", {})}
381 _create_and_start_container(service_component_name, image, **sub_kwargs)
383 # TODO: Use regular logging here
384 ctx.logger.info("Container started: {0}".format(service_component_name))
388 def _verify_component(**kwargs):
389 """Verify component (container) is healthy"""
390 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
391 # TODO: "Consul doesn't make its first health check immediately upon registration.
392 # Instead it waits for the health check interval to pass."
393 # Possible enhancement is to read the interval (and possibly the timeout) from
394 # docker_config and multiply that by a number to come up with a more suitable
397 max_wait = kwargs.get("max_wait", 300)
399 # Verify that the container is healthy
401 if _verify_container(service_component_name, max_wait):
402 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
404 # TODO: Use regular logging here
405 ctx.logger.info("Container is healthy: {0}".format(service_component_name))
409 def _done_for_start(**kwargs):
410 ctx.instance.runtime_properties.update(kwargs)
411 ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
414 def _setup_msb_registration(service_name, msb_reg):
416 "serviceName" : service_name,
417 "port" : msb_reg.get("port", "80"),
418 "version" : msb_reg.get("version", "v1"),
419 "url" : msb_reg.get("url_path", "/v1"),
421 "enable_ssl" : msb_reg.get("uses_ssl", False),
425 @wrap_error_handling_start
426 @merge_inputs_for_start
429 def create_and_start_container_for_components(**start_inputs):
430 """Create Docker container and start for components
432 This operation method is to be used with the DockerContainerForComponents
433 node type. After launching the container, the plugin will verify with Consul
434 that the app is up and healthy before terminating.
438 **_create_and_start_component(
439 **_enhance_docker_params(
440 **_parse_cloudify_context(**start_inputs)))))
443 def _update_delivery_url(**kwargs):
444 """Update the delivery url for data router subscribers"""
445 dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
446 if s["type"] == "data_router"]
449 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
450 # TODO: Should NOT be setting up the delivery url with ip addresses
451 # because in the https case, this will not work because data router does
452 # a certificate validation using the fqdn.
453 subscriber_host = _lookup_service(service_component_name, with_port=True)
455 for dr_sub in dr_subs:
456 scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME
457 if "route" not in dr_sub:
458 raise NonRecoverableError("'route' key missing from data router subscriber")
459 path = dr_sub["route"]
460 dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format(
461 scheme=scheme, host=subscriber_host, path=path)
462 kwargs[dr_sub["name"]] = dr_sub
466 @wrap_error_handling_start
467 @merge_inputs_for_start
470 def create_and_start_container_for_components_with_streams(**start_inputs):
471 """Create Docker container and start for components that have streams
473 This operation method is to be used with the DockerContainerForComponents
474 node type. After launching the container, the plugin will verify with Consul
475 that the app is up and healthy before terminating.
478 **_update_delivery_url(
480 **_create_and_start_component(
481 **_enhance_docker_params(
482 **_parse_cloudify_context(**start_inputs))))))
485 @wrap_error_handling_start
488 def create_and_start_container_for_platforms(**kwargs):
489 """Create Docker container and start for platform services
491 This operation method is to be used with the ContainerizedPlatformComponent
494 # Capture node properties
495 image = ctx.node.properties["image"]
496 docker_config = ctx.node.properties.get("docker_config", {})
497 if "dns_name" in ctx.node.properties:
498 service_component_name = ctx.node.properties["dns_name"]
500 service_component_name = ctx.node.properties["name"]
503 envs = kwargs.get("envs", {})
504 # NOTE: Healthchecks are optional until prepared to handle use cases that
505 # don't necessarily use http
506 envs_healthcheck = doc.create_envs_healthcheck(docker_config) \
507 if "healthcheck" in docker_config else {}
508 envs.update(envs_healthcheck)
509 kwargs["envs"] = envs
511 # Set some labels for the Kubernetes pods
513 "cfydeployment" : ctx.deployment.id,
514 "cfynode": ctx.node.name,
515 "cfynodeinstance": ctx.instance.id
518 host_port = ctx.node.properties["host_port"]
519 container_port = ctx.node.properties["container_port"]
521 # Cloudify properties are all required and Cloudify complains that None
522 # is not a valid type for integer. Defaulting to 0 to indicate to not
523 # use this and not to set a specific port mapping in cases like service
525 if container_port != 0:
526 # Doing this because other nodes might want to use this property
527 port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port)
528 ports = kwargs.get("ports", []) + [ port_mapping ]
529 kwargs["ports"] = ports
530 if "ports" not in kwargs:
531 ctx.logger.warn("No port mappings defined. Will randomly assign port.")
533 # All of the new node properties could be handled more DRYly!
534 # If a registration to MSB is required, then set up the registration info
535 if "msb_registration" in ctx.node.properties and "port" in ctx.node.properties["msb_registration"]:
536 kwargs["msb_list"] = [_setup_msb_registration(service_component_name, ctx.node.properties["msb_registration"])]
538 # If centralized logging via ELK is desired, then set up the logging info
539 if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
540 kwargs["log_info"] = ctx.node.properties["log_info"]
542 # Pick up replica count and always_pull_image flag
543 if "replicas" in ctx.node.properties:
544 kwargs["replicas"] = ctx.node.properties["replicas"]
545 if "always_pull_image" in ctx.node.properties:
546 kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
547 _create_and_start_container(service_component_name, image, **kwargs)
549 ctx.logger.info("Container started: {0}".format(service_component_name))
551 # Verify that the container is healthy
553 max_wait = kwargs.get("max_wait", 300)
555 if _verify_container(service_component_name, max_wait):
556 ctx.logger.info("Container is healthy: {0}".format(service_component_name))
559 @wrap_error_handling_start
562 def create_and_start_container(**kwargs):
563 """Create Docker container and start"""
564 service_component_name = ctx.node.properties["name"]
565 ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
567 image = ctx.node.properties["image"]
569 _create_and_start_container(service_component_name, image,**kwargs)
571 ctx.logger.info("Component deployed: {0}".format(service_component_name))
576 def stop_and_remove_container(**kwargs):
577 """Stop and remove Docker container"""
579 deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
580 k8sclient.undeploy(deployment_description)
582 except Exception as e:
583 ctx.logger.error("Unexpected error while stopping container: {0}"
588 def scale(replicas, **kwargs):
589 """Change number of replicas in the deployment"""
591 current_replicas = ctx.instance.runtime_properties["replicas"]
592 ctx.logger.info("Scaling from {0} to {1}".format(current_replicas, replicas))
594 deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
595 k8sclient.scale(deployment_description, replicas)
596 ctx.instance.runtime_properties["replicas"] = replicas
597 except Exception as e:
598 ctx.logger.error ("Unexpected error while scaling {0}".format(str(e)))
600 ctx.logger.info("Ignoring request to scale to zero replicas")
603 @Policies.cleanup_policies_on_node
605 def cleanup_discovery(**kwargs):
606 """Delete configuration from Consul"""
607 service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
610 conn = dis.create_kv_conn(CONSUL_HOST)
611 dis.remove_service_component_config(conn, service_component_name)
612 except dis.DiscoveryConnectionError as e:
613 raise RecoverableError(e)
616 def _notify_container(**kwargs):
617 """Notify container using the policy section in the docker_config"""
618 dc = kwargs["docker_config"]
621 if dc["policy"]["trigger_type"] == "docker":
624 Need replacement for this in kubernetes.
625 Need to find all the pods that have been deployed
626 and execute the script in them.
627 Kubernetes does not appear to have a way to ask for a script
628 to be executed in all of the currently running pods for a
629 Kubernetes Deployment or ReplicaSet. We will have to find
630 each of them and run the script. The problem is that set of
631 pods could be changing. We can query to get all the pods, but
632 there's no guarantee the list won't change while we're trying to
635 In ONAP R2, all of the policy-driven components rely on polling.
638 # REVIEW: Need to finalize on the docker config policy data structure
639 script_path = dc["policy"]["script_path"]
640 updated_policies = kwargs["updated_policies"]
641 removed_policies = kwargs["removed_policies"]
642 policies = kwargs["policies"]
643 cmd = doc.build_policy_update_cmd(script_path, use_sh=False,
645 updated_policies=updated_policies,
646 removed_policies=removed_policies,
650 docker_host = kwargs[SELECTED_CONTAINER_DESTINATION]
651 docker_host_ip = _lookup_service(docker_host)
652 logins = _get_docker_logins()
653 client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins)
655 container_id = kwargs["container_id"]
657 doc.notify_for_policy_update(client, container_id, cmd)
659 # else the default is no trigger
665 @Policies.update_policies_on_node()
667 def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
668 """Policy update task
670 This method is responsible for updating the application configuration and
671 notifying the applications that the change has occurred. This is to be used
672 for the dcae.interfaces.policy.policy_update operation.
674 :updated_policies: contains the list of changed policy-configs when configs_only=True
675 (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
677 update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
678 update_inputs["updated_policies"] = updated_policies
679 update_inputs["removed_policies"] = removed_policies
680 update_inputs["policies"] = policies
682 _notify_container(**update_inputs)