1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
20 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 # Lifecycle interface calls for containerized components
24 # Needed by Cloudify Manager to load google.auth for the Kubernetes python client
25 from . import cloudify_importer
29 from cloudify import ctx
30 from cloudify.decorators import operation
31 from cloudify.exceptions import NonRecoverableError, RecoverableError
32 from onap_dcae_dcaepolicy_lib import Policies
33 from k8splugin import discovery as dis
34 from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
35 merge_inputs_for_start, merge_inputs_for_create, wrap_error_handling_update
36 from k8splugin.exceptions import DockerPluginDeploymentError
37 from k8splugin import utils
38 from configure import configure
42 plugin_conf = configure.configure()
43 CONSUL_HOST = plugin_conf.get("consul_host")
44 CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
45 DCAE_NAMESPACE = plugin_conf.get("namespace")
46 DEFAULT_MAX_WAIT = plugin_conf.get("max_wait", 1800)
47 DEFAULT_K8S_LOCATION = plugin_conf.get("default_k8s_location")
48 COMPONENT_CA_CERT_PATH = plugin_conf.get("tls").get("component_ca_cert_path")
49 CBS_BASE_URL = plugin_conf.get("cbs").get("base_url")
51 # Used to construct delivery urls for data router subscribers. Data router in FTL
52 # requires https but this author believes that ONAP is to be defaulted to http.
53 DEFAULT_SCHEME = "http"
56 SERVICE_COMPONENT_NAME = "service_component_name"
57 CONTAINER_ID = "container_id"
58 APPLICATION_CONFIG = "application_config"
59 K8S_DEPLOYMENT = "k8s_deployment"
60 RESOURCE_KW = "resource_config"
61 LOCATION_ID = "location_id"
65 # Lifecycle interface calls for dcae.nodes.DockerContainer
67 def _setup_for_discovery(**kwargs):
68 """Setup for config discovery"""
71 application_config = kwargs[APPLICATION_CONFIG]
73 # NOTE: application_config is no longer a json string and is inputed as a
74 # YAML map which translates to a dict. We don't have to do any
75 # preprocessing anymore.
76 conn = dis.create_kv_conn(CONSUL_HOST)
77 dis.push_service_component_config(conn, name, application_config)
79 except dis.DiscoveryConnectionError as e:
80 raise RecoverableError(e)
81 except Exception as e:
82 ctx.logger.error("Unexpected error while pushing configuration: {0}"
84 raise NonRecoverableError(e)
86 def _generate_component_name(**kwargs):
87 """Generate component name"""
88 service_component_type = kwargs['service_component_type']
89 name_override = kwargs['service_component_name_override']
91 kwargs['name'] = name_override if name_override \
92 else dis.generate_service_component_name(service_component_type)
95 def _done_for_create(**kwargs):
96 """Wrap up create operation"""
98 kwargs[SERVICE_COMPONENT_NAME] = name
99 # All updates to the runtime_properties happens here. I don't see a reason
100 # why we shouldn't do this because the context is not being mutated by
101 # something else and will keep the other functions pure (pure in the sense
102 # not dealing with CloudifyContext).
103 ctx.instance.runtime_properties.update(kwargs)
104 ctx.logger.info("Done setting up: {0}".format(name))
107 def _get_resources(**kwargs):
108 if kwargs is not None:
109 ctx.logger.debug("{0}: {1}".format(RESOURCE_KW, kwargs.get(RESOURCE_KW)))
110 return kwargs.get(RESOURCE_KW)
111 ctx.logger.info("set resources to None")
115 ''' Get the k8s location property. Set to the default if the property is missing, None, or zero-length '''
116 return ctx.node.properties["location_id"] if "location_id" in ctx.node.properties and ctx.node.properties["location_id"] \
117 else DEFAULT_K8S_LOCATION
119 @merge_inputs_for_create
121 @Policies.gather_policies_to_node()
123 def create_for_components(**create_inputs):
124 """Create step for service components
126 This interface is responsible for:
128 1. Generating service component name
129 2. Populating config information into Consul
132 **_setup_for_discovery(
133 **_enhance_docker_params(
134 **_generate_component_name(
138 def _parse_streams(**kwargs):
139 """Parse streams and setup for DMaaP plugin"""
140 # The DMaaP plugin requires this plugin to set the runtime properties
141 # keyed by the node name.
142 for stream in kwargs["streams_publishes"]:
143 kwargs[stream["name"]] = stream
145 for stream in kwargs["streams_subscribes"]:
146 if stream["type"] == "data_router":
148 # Don't want to mutate the source
149 stream = copy.deepcopy(stream)
151 # Set up the delivery URL
152 # Using service_component_name as the host name in the subscriber URL
153 # will work in a single-cluster ONAP deployment. Whether it will also work
154 # in a multi-cluster ONAP deployment--with a central location and one or
155 # more remote ("edge") locations depends on how networking and DNS is set
156 # up in a multi-cluster deployment
157 service_component_name = kwargs["name"]
158 ports, _ = k8sclient.parse_ports(kwargs["ports"])
160 subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport)
162 scheme = stream.get("scheme", DEFAULT_SCHEME)
163 if "route" not in stream:
164 raise NonRecoverableError("'route' key missing from data router subscriber")
165 path = stream["route"]
166 stream["delivery_url"] = "{scheme}://{host}/{path}".format(
167 scheme=scheme, host=subscriber_host, path=path)
169 # If username and password has not been provided then generate it. The
170 # DMaaP plugin doesn't generate for subscribers. The generation code
171 # and length of username password has been lifted from the DMaaP
173 if not stream.get("username", None):
174 stream["username"] = utils.random_string(8)
175 if not stream.get("password", None):
176 stream["password"] = utils.random_string(10)
178 kwargs[stream["name"]] = stream
182 @merge_inputs_for_create
184 @Policies.gather_policies_to_node()
186 def create_for_components_with_streams(**create_inputs):
187 """Create step for service components that use DMaaP
189 This interface is responsible for:
191 1. Generating service component name
192 2. Setup runtime properties for DMaaP plugin
193 3. Populating application config into Consul
196 **_setup_for_discovery(
198 **_enhance_docker_params(
199 **_generate_component_name(
202 @merge_inputs_for_create
205 def create_for_platforms(**create_inputs):
206 """Create step for platform components
208 This interface is responible for:
210 1. Populating config information into Consul
213 **_setup_for_discovery(
216 def _verify_k8s_deployment(location, service_component_name, max_wait):
217 """Verify that the k8s Deployment is ready
221 location (string): location of the k8s cluster where the component was deployed
222 service_component_name: component's service component name
223 max_wait (integer): limit to how may attempts to make which translates to
224 seconds because each sleep is one second. 0 means infinite.
228 True if deployment is ready within the maximum wait time, False otherwise
233 if k8sclient.is_available(location, DCAE_NAMESPACE, service_component_name):
238 if max_wait > 0 and max_wait < num_attempts:
245 def _create_and_start_container(container_name, image, **kwargs):
247 This will create a k8s Deployment and, if needed, a k8s Service or two.
248 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
249 We're not exposing k8s to the component developer and the blueprint author.
250 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
251 the details from the component developer and the blueprint author.)
254 - volumes: array of volume objects, where a volume object is:
255 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
256 - ports: array of strings in the form "container_port:host_port"
257 - envs: map of name-value pairs ( {name0: value0, name1: value1...} )
258 - always_pull: boolean. If true, sets image pull policy to "Always"
259 so that a fresh copy of the image is always pull. Otherwise, sets
260 image pull policy to "IfNotPresent"
261 - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
262 - log_info: an object with info for setting up ELK logging, with the form:
263 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}"
264 - tls_info: an object with information for setting up the component to act as a TLS server, with the form:
265 {"use_tls" : true_or_false, "cert_directory": "/path/to/directory_where_certs_should_be_placed" }
266 - replicas: number of replicas to be launched initially
267 - readiness: object with information needed to create a readiness check
268 - liveness: object with information needed to create a liveness check
269 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
271 tls_info = kwargs.get("tls_info")
272 env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
273 "CONFIG_BINDING_SERVICE": "config-binding-service",
274 "DCAE_CA_CERTPATH" : "{0}/cacert.pem".format(tls_info["cert_directory"]) if (tls_info and tls_info["cert_directory"]) else COMPONENT_CA_CERT_PATH,
275 "CBS_CONFIG_URL" : "{0}/{1}".format(CBS_BASE_URL, container_name)
277 env.update(kwargs.get("envs", {}))
278 ctx.logger.info("Starting k8s deployment for {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
279 ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
280 replicas = kwargs.get("replicas", 1)
281 resource_config = _get_resources(**kwargs)
282 _, dep = k8sclient.deploy(DCAE_NAMESPACE,
286 always_pull=kwargs.get("always_pull_image", False),
287 k8sconfig=plugin_conf,
288 resources=resource_config,
289 volumes=kwargs.get("volumes", []),
290 ports=kwargs.get("ports", []),
291 msb_list=kwargs.get("msb_list"),
292 tls_info=kwargs.get("tls_info"),
294 labels=kwargs.get("labels", {}),
295 log_info=kwargs.get("log_info"),
296 readiness=kwargs.get("readiness"),
297 liveness=kwargs.get("liveness"),
298 k8s_location=kwargs.get("k8s_location"))
300 # Capture the result of deployment for future use
301 ctx.instance.runtime_properties[K8S_DEPLOYMENT] = dep
302 kwargs[K8S_DEPLOYMENT] = dep
303 ctx.instance.runtime_properties["replicas"] = replicas
304 ctx.logger.info ("k8s deployment initiated successfully for {0}: {1}".format(container_name, dep))
307 def _parse_cloudify_context(**kwargs):
308 """Parse Cloudify context
310 Extract what is needed. This is impure function because it requires ctx.
312 kwargs["deployment_id"] = ctx.deployment.id
314 # Set some labels for the Kubernetes pods
315 # The name segment is required and must be 63 characters or less
317 "cfydeployment" : ctx.deployment.id,
318 "cfynode": ctx.node.name[:63],
319 "cfynodeinstance": ctx.instance.id[:63]
322 # Pick up the centralized logging info
323 if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
324 kwargs["log_info"] = ctx.node.properties["log_info"]
326 # Pick up TLS info if present
327 if "tls_info" in ctx.node.properties:
328 kwargs["tls_info"] = ctx.node.properties["tls_info"]
330 # Pick up replica count and always_pull_image flag
331 if "replicas" in ctx.node.properties:
332 kwargs["replicas"] = ctx.node.properties["replicas"]
333 if "always_pull_image" in ctx.node.properties:
334 kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
337 kwargs["k8s_location"] = _get_location()
341 def _enhance_docker_params(**kwargs):
343 Set up Docker environment variables and readiness/liveness check info
344 and inject into kwargs.
347 # Get info for setting up readiness/liveness probe, if present
348 docker_config = kwargs.get("docker_config", {})
349 if "healthcheck" in docker_config:
350 kwargs["readiness"] = docker_config["healthcheck"]
351 if "livehealthcheck" in docker_config:
352 kwargs["liveness"] = docker_config["livehealthcheck"]
354 envs = kwargs.get("envs", {})
356 # Set tags on this component for its Consul registration as a service
357 tags = [kwargs.get("deployment_id", None), kwargs["service_id"]]
358 tags = [ str(tag) for tag in tags if tag is not None ]
359 # Registrator will use this to register this component with tags. Must be
361 envs["SERVICE_TAGS"] = ",".join(tags)
363 kwargs["envs"] = envs
365 def combine_params(key, docker_config, kwargs):
366 v = docker_config.get(key, []) + kwargs.get(key, [])
370 # Add the lists of ports and volumes unintelligently - meaning just add the
371 # lists together with no deduping.
372 kwargs = combine_params("ports", docker_config, kwargs)
373 kwargs = combine_params("volumes", docker_config, kwargs)
378 def _create_and_start_component(**kwargs):
379 """Create and start component (container)"""
380 image = kwargs["image"]
381 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
382 # Need to be picky and manually select out pieces because just using kwargs
383 # which contains everything confused the execution of
384 # _create_and_start_container because duplicate variables exist
386 "volumes": kwargs.get("volumes", []),
387 "ports": kwargs.get("ports", None),
388 "envs": kwargs.get("envs", {}),
389 "log_info": kwargs.get("log_info", {}),
390 "tls_info": kwargs.get("tls_info", {}),
391 "labels": kwargs.get("labels", {}),
392 "resource_config": kwargs.get("resource_config",{}),
393 "readiness": kwargs.get("readiness",{}),
394 "liveness": kwargs.get("liveness",{}),
395 "k8s_location": kwargs.get("k8s_location")}
396 returned_args = _create_and_start_container(service_component_name, image, **sub_kwargs)
397 kwargs[K8S_DEPLOYMENT] = returned_args[K8S_DEPLOYMENT]
401 def _verify_component(**kwargs):
402 """Verify deployment is ready"""
403 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
405 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
406 ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
408 if _verify_k8s_deployment(kwargs.get("k8s_location"), service_component_name, max_wait):
409 ctx.logger.info("k8s deployment is ready for: {0}".format(service_component_name))
411 # The component did not become ready within the "max_wait" interval.
412 # Delete the k8s components created already and remove configuration from Consul.
413 ctx.logger.error("k8s deployment never became ready for {0}".format(service_component_name))
414 if (K8S_DEPLOYMENT in kwargs) and (len(kwargs[K8S_DEPLOYMENT]["deployment"]) > 0):
415 ctx.logger.info("attempting to delete k8s artifacts: {0}".format(kwargs[K8S_DEPLOYMENT]))
416 k8sclient.undeploy(kwargs[K8S_DEPLOYMENT])
417 ctx.logger.info("deleted k8s artifacts: {0}".format(kwargs[K8S_DEPLOYMENT]))
418 cleanup_discovery(**kwargs)
419 raise DockerPluginDeploymentError("k8s deployment never became ready for {0}".format(service_component_name))
423 def _done_for_start(**kwargs):
424 ctx.instance.runtime_properties.update(kwargs)
425 ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
428 def _setup_msb_registration(service_name, msb_reg):
430 "serviceName" : service_name,
431 "port" : msb_reg.get("port", "80"),
432 "version" : msb_reg.get("version", "v1"),
433 "url" : msb_reg.get("url_path", "/v1"),
435 "enable_ssl" : msb_reg.get("uses_ssl", False),
439 @wrap_error_handling_start
440 @merge_inputs_for_start
443 def create_and_start_container_for_components(**start_inputs):
444 """Initiate Kubernetes deployment for service components
446 This operation method is to be used with the ContainerizedServiceComponent
447 node type. After initiating a Kubernetes deployment, the plugin will verify with Kubernetes
448 that the app is up and responding successfully to readiness probes.
452 **_create_and_start_component(
453 **_parse_cloudify_context(**start_inputs))))
455 @wrap_error_handling_start
458 def create_and_start_container_for_platforms(**kwargs):
459 """Initiate Kubernetes deployment for platform components
461 This operation method is to be used with the ContainerizedPlatformComponent
464 # Capture node properties
465 image = ctx.node.properties["image"]
466 docker_config = ctx.node.properties.get("docker_config", {})
467 resource_config = ctx.node.properties.get("resource_config", {})
468 kwargs["resource_config"] = resource_config
469 if "healthcheck" in docker_config:
470 kwargs["readiness"] = docker_config["healthcheck"]
471 if "livehealthcheck" in docker_config:
472 kwargs["liveness"] = docker_config["livehealthcheck"]
473 if "dns_name" in ctx.node.properties:
474 service_component_name = ctx.node.properties["dns_name"]
476 service_component_name = ctx.node.properties["name"]
478 # Set some labels for the Kubernetes pods
479 # The name segment is required and must be 63 characters or less
481 "cfydeployment" : ctx.deployment.id,
482 "cfynode": ctx.node.name[:63],
483 "cfynodeinstance": ctx.instance.id[:63]
486 host_port = ctx.node.properties["host_port"]
487 container_port = ctx.node.properties["container_port"]
489 # Cloudify properties are all required and Cloudify complains that None
490 # is not a valid type for integer. Defaulting to 0 to indicate to not
491 # use this and not to set a specific port mapping in cases like service
493 if container_port != 0:
494 # Doing this because other nodes might want to use this property
495 port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port)
496 ports = kwargs.get("ports", []) + [ port_mapping ]
497 kwargs["ports"] = ports
498 if "ports" not in kwargs:
499 ctx.logger.warn("No port mappings defined. Will randomly assign port.")
501 # All of the new node properties could be handled more DRYly!
502 # If a registration to MSB is required, then set up the registration info
503 if "msb_registration" in ctx.node.properties and "port" in ctx.node.properties["msb_registration"]:
504 kwargs["msb_list"] = [_setup_msb_registration(service_component_name, ctx.node.properties["msb_registration"])]
506 # If centralized logging via ELK is desired, then set up the logging info
507 if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
508 kwargs["log_info"] = ctx.node.properties["log_info"]
510 # Pick up TLS info if present
511 if "tls_info" in ctx.node.properties:
512 kwargs["tls_info"] = ctx.node.properties["tls_info"]
514 # Pick up replica count and always_pull_image flag
515 if "replicas" in ctx.node.properties:
516 kwargs["replicas"] = ctx.node.properties["replicas"]
517 if "always_pull_image" in ctx.node.properties:
518 kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
521 kwargs["k8s_location"] = _get_location()
523 returned_args = _create_and_start_container(service_component_name, image, **kwargs)
525 # Verify that the k8s deployment is ready
526 # - Set service component name into kwargs
527 # - max_wait is already in kwargs if it was set
528 returned_args[SERVICE_COMPONENT_NAME] = service_component_name
529 _verify_component(**returned_args)
531 @wrap_error_handling_start
534 def create_and_start_container(**kwargs):
535 """Initiate a Kubernetes deployment for the generic ContainerizedApplication node type"""
536 service_component_name = ctx.node.properties["name"]
537 ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
539 image = ctx.node.properties["image"]
540 kwargs["k8s_location"] = _get_location()
542 _create_and_start_container(service_component_name, image,**kwargs)
546 def stop_and_remove_container(**kwargs):
547 """Delete Kubernetes deployment"""
548 if K8S_DEPLOYMENT in ctx.instance.runtime_properties:
550 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
551 k8sclient.undeploy(deployment_description)
553 except Exception as e:
554 ctx.logger.error("Unexpected error while deleting k8s deployment: {0}"
557 # A previous install workflow may have failed,
558 # and no Kubernetes deployment info was recorded in runtime_properties.
559 # No need to run the undeploy operation
560 ctx.logger.info("No k8s deployment information, not attempting to delete k8s deployment")
562 @wrap_error_handling_update
565 def scale(replicas, **kwargs):
566 """Change number of replicas in the deployment"""
567 service_component_name = ctx.instance.runtime_properties["service_component_name"]
570 current_replicas = ctx.instance.runtime_properties["replicas"]
571 ctx.logger.info("Scaling {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
572 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
573 k8sclient.scale(deployment_description, replicas)
574 ctx.instance.runtime_properties["replicas"] = replicas
576 # Verify that the scaling took place as expected
577 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
578 ctx.logger.info("Waiting up to {0} secs for {1} to scale and become ready".format(max_wait, service_component_name))
579 if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait):
580 ctx.logger.info("Scaling complete: {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
583 ctx.logger.info("Ignoring request to scale {0} to zero replicas".format(service_component_name))
585 @wrap_error_handling_update
588 def update_image(image, **kwargs):
589 """ Restart component with a new Docker image """
591 service_component_name = ctx.instance.runtime_properties["service_component_name"]
593 current_image = ctx.instance.runtime_properties["image"]
594 ctx.logger.info("Updating app image for {0} from {1} to {2}".format(service_component_name, current_image, image))
595 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
596 k8sclient.upgrade(deployment_description, image)
597 ctx.instance.runtime_properties["image"] = image
599 # Verify that the update took place as expected
600 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
601 ctx.logger.info("Waiting up to {0} secs for {1} to be updated and become ready".format(max_wait, service_component_name))
602 if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait):
603 ctx.logger.info("Update complete: {0} from {1} to {2}".format(service_component_name, current_image, image))
606 ctx.logger.info("Ignoring update_image request for {0} with unusable image '{1}'".format(service_component_name, str(image)))
608 #TODO: implement rollback operation when kubernetes python client fix is available.
609 # (See comments in k8sclient.py.)
610 # In the meantime, it's possible to undo an update_image operation by doing a second
611 # update_image that specifies the older image.
614 @Policies.cleanup_policies_on_node
616 def cleanup_discovery(**kwargs):
617 """Delete configuration from Consul"""
618 if SERVICE_COMPONENT_NAME in ctx.instance.runtime_properties:
619 service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
622 conn = dis.create_kv_conn(CONSUL_HOST)
623 dis.remove_service_component_config(conn, service_component_name)
624 except dis.DiscoveryConnectionError as e:
625 raise RecoverableError(e)
627 # When another node in the blueprint fails install,
628 # this node may not have generated a service component name.
629 # There's nothing to delete from Consul.
630 ctx.logger.info ("No service_component_name, not attempting to delete config from Consul")
632 def _notify_container(**kwargs):
634 Notify container using the policy section in the docker_config.
635 Notification consists of running a script in the application container
636 in each pod in the Kubernetes deployment associated with this node.
637 Return the list of notification results.
639 dc = kwargs["docker_config"]
642 if "policy" in dc and dc["policy"].get("trigger_type") == "docker":
643 # Build the command to execute in the container
644 # SCRIPT_PATH policies {"policies" : ...., "updated_policies" : ..., "removed_policies": ...}
645 script_path = dc["policy"]["script_path"]
647 "policies": kwargs["policies"],
648 "updated_policies": kwargs["updated_policies"],
649 "removed_policies": kwargs["removed_policies"]
652 command = [script_path, "policies", json.dumps(policy_data)]
654 # Execute the command
655 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
656 resp = k8sclient.execute_command_in_deployment(deployment_description, command)
658 # else the default is no trigger
664 @Policies.update_policies_on_node()
665 def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
666 """Policy update task
668 This method is responsible for updating the application configuration and
669 notifying the applications that the change has occurred. This is to be used
670 for the dcae.interfaces.policy.policy_update operation.
672 :updated_policies: contains the list of changed policy-configs when configs_only=True
673 (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
675 service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
676 ctx.logger.info("policy_update for {0}-- updated_policies: {1}, removed_policies: {2}, policies: {3}"
677 .format(service_component_name, updated_policies, removed_policies, policies))
678 update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
679 update_inputs["updated_policies"] = updated_policies
680 update_inputs["removed_policies"] = removed_policies
681 update_inputs["policies"] = policies
683 resp = _notify_container(**update_inputs)
684 ctx.logger.info("policy_update complete for {0}--notification results: {1}".format(service_component_name,json.dumps(resp)))