1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
20 # Lifecycle interface calls for containerized components
22 # Needed by Cloudify Manager to load google.auth for the Kubernetes python client
23 from . import cloudify_importer
27 from cloudify import ctx
28 from cloudify.decorators import operation
29 from cloudify.exceptions import NonRecoverableError, RecoverableError
30 from onap_dcae_dcaepolicy_lib import Policies
31 from k8splugin import discovery as dis
32 from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
33 merge_inputs_for_start, merge_inputs_for_create, wrap_error_handling_update
34 from k8splugin.exceptions import DockerPluginDeploymentError
35 from k8splugin import utils
36 from configure import configure
40 plugin_conf = configure.configure()
41 CONSUL_HOST = plugin_conf.get("consul_host")
42 CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
43 DCAE_NAMESPACE = plugin_conf.get("namespace")
44 DEFAULT_MAX_WAIT = plugin_conf.get("max_wait")
45 DEFAULT_K8S_LOCATION = plugin_conf.get("default_k8s_location")
46 COMPONENT_CERT_DIR = plugin_conf.get("tls",{}).get("component_cert_dir")
47 CBS_BASE_URL = plugin_conf.get("cbs").get("base_url")
49 # Used to construct delivery urls for data router subscribers. Data router in FTL
50 # requires https but this author believes that ONAP is to be defaulted to http.
51 DEFAULT_SCHEME = "http"
54 SERVICE_COMPONENT_NAME = "service_component_name"
55 CONTAINER_ID = "container_id"
56 APPLICATION_CONFIG = "application_config"
57 K8S_DEPLOYMENT = "k8s_deployment"
58 RESOURCE_KW = "resource_config"
59 LOCATION_ID = "location_id"
63 # Lifecycle interface calls for dcae.nodes.DockerContainer
65 def _setup_for_discovery(**kwargs):
66 """Setup for config discovery"""
69 application_config = kwargs[APPLICATION_CONFIG]
71 # NOTE: application_config is no longer a json string and is inputed as a
72 # YAML map which translates to a dict. We don't have to do any
73 # preprocessing anymore.
74 conn = dis.create_kv_conn(CONSUL_HOST)
75 dis.push_service_component_config(conn, name, application_config)
77 except dis.DiscoveryConnectionError as e:
78 raise RecoverableError(e)
79 except Exception as e:
80 ctx.logger.error("Unexpected error while pushing configuration: {0}"
82 raise NonRecoverableError(e)
84 def _generate_component_name(**kwargs):
85 """Generate component name"""
86 service_component_type = kwargs['service_component_type']
87 name_override = kwargs['service_component_name_override']
89 kwargs['name'] = name_override if name_override \
90 else dis.generate_service_component_name(service_component_type)
93 def _done_for_create(**kwargs):
94 """Wrap up create operation"""
96 kwargs[SERVICE_COMPONENT_NAME] = name
97 # All updates to the runtime_properties happens here. I don't see a reason
98 # why we shouldn't do this because the context is not being mutated by
99 # something else and will keep the other functions pure (pure in the sense
100 # not dealing with CloudifyContext).
101 ctx.instance.runtime_properties.update(kwargs)
102 ctx.logger.info("Done setting up: {0}".format(name))
105 def _get_resources(**kwargs):
106 if kwargs is not None:
107 ctx.logger.debug("{0}: {1}".format(RESOURCE_KW, kwargs.get(RESOURCE_KW)))
108 return kwargs.get(RESOURCE_KW)
109 ctx.logger.info("set resources to None")
113 ''' Get the k8s location property. Set to the default if the property is missing, None, or zero-length '''
114 return ctx.node.properties["location_id"] if "location_id" in ctx.node.properties and ctx.node.properties["location_id"] \
115 else DEFAULT_K8S_LOCATION
117 @merge_inputs_for_create
119 @Policies.gather_policies_to_node()
121 def create_for_components(**create_inputs):
122 """Create step for service components
124 This interface is responsible for:
126 1. Generating service component name
127 2. Populating config information into Consul
130 **_setup_for_discovery(
131 **_enhance_docker_params(
132 **_generate_component_name(
136 def _parse_streams(**kwargs):
137 """Parse streams and setup for DMaaP plugin"""
138 # The DMaaP plugin requires this plugin to set the runtime properties
139 # keyed by the node name.
140 for stream in kwargs["streams_publishes"]:
141 kwargs[stream["name"]] = stream
143 for stream in kwargs["streams_subscribes"]:
144 if stream["type"] == "data_router":
146 # Don't want to mutate the source
147 stream = copy.deepcopy(stream)
149 # Set up the delivery URL
150 # Using service_component_name as the host name in the subscriber URL
151 # will work in a single-cluster ONAP deployment. Whether it will also work
152 # in a multi-cluster ONAP deployment--with a central location and one or
153 # more remote ("edge") locations depends on how networking and DNS is set
154 # up in a multi-cluster deployment
155 service_component_name = kwargs["name"]
156 ports, _ = k8sclient.parse_ports(kwargs["ports"])
158 subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport)
160 scheme = stream.get("scheme", DEFAULT_SCHEME)
161 if "route" not in stream:
162 raise NonRecoverableError("'route' key missing from data router subscriber")
163 path = stream["route"]
164 stream["delivery_url"] = "{scheme}://{host}/{path}".format(
165 scheme=scheme, host=subscriber_host, path=path)
167 # If username and password has not been provided then generate it. The
168 # DMaaP plugin doesn't generate for subscribers. The generation code
169 # and length of username password has been lifted from the DMaaP
171 if not stream.get("username", None):
172 stream["username"] = utils.random_string(8)
173 if not stream.get("password", None):
174 stream["password"] = utils.random_string(10)
176 kwargs[stream["name"]] = stream
180 @merge_inputs_for_create
182 @Policies.gather_policies_to_node()
184 def create_for_components_with_streams(**create_inputs):
185 """Create step for service components that use DMaaP
187 This interface is responsible for:
189 1. Generating service component name
190 2. Setup runtime properties for DMaaP plugin
191 3. Populating application config into Consul
194 **_setup_for_discovery(
196 **_enhance_docker_params(
197 **_generate_component_name(
200 @merge_inputs_for_create
203 def create_for_platforms(**create_inputs):
204 """Create step for platform components
206 This interface is responible for:
208 1. Populating config information into Consul
211 **_setup_for_discovery(
214 def _verify_k8s_deployment(location, service_component_name, max_wait):
215 """Verify that the k8s Deployment is ready
219 location (string): location of the k8s cluster where the component was deployed
220 service_component_name: component's service component name
221 max_wait (integer): limit to how may attempts to make which translates to
222 seconds because each sleep is one second. 0 means infinite.
226 True if deployment is ready within the maximum wait time, False otherwise
231 if k8sclient.is_available(location, DCAE_NAMESPACE, service_component_name):
236 if max_wait > 0 and max_wait < num_attempts:
243 def _create_and_start_container(container_name, image, **kwargs):
245 This will create a k8s Deployment and, if needed, a k8s Service or two.
246 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
247 We're not exposing k8s to the component developer and the blueprint author.
248 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
249 the details from the component developer and the blueprint author.)
252 - volumes: array of volume objects, where a volume object is:
253 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
254 - ports: array of strings in the form "container_port:host_port"
255 - envs: map of name-value pairs ( {name0: value0, name1: value1...} )
256 - always_pull: boolean. If true, sets image pull policy to "Always"
257 so that a fresh copy of the image is always pull. Otherwise, sets
258 image pull policy to "IfNotPresent"
259 - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
260 - log_info: an object with info for setting up ELK logging, with the form:
261 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}"
262 - tls_info: an object with information for setting up the component to act as a TLS server, with the form:
263 {"use_tls" : true_or_false, "cert_directory": "/path/to/directory_where_certs_should_be_placed" }
264 - replicas: number of replicas to be launched initially
265 - readiness: object with information needed to create a readiness check
266 - liveness: object with information needed to create a liveness check
267 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
269 tls_info = kwargs.get("tls_info") or {}
270 cert_dir = tls_info.get("cert_directory") or COMPONENT_CERT_DIR
271 env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
272 "CONFIG_BINDING_SERVICE": "config-binding-service",
273 "DCAE_CA_CERTPATH" : "{0}/cacert.pem".format(cert_dir),
274 "CBS_CONFIG_URL" : "{0}/{1}".format(CBS_BASE_URL, container_name)
276 env.update(kwargs.get("envs", {}))
277 ctx.logger.info("Starting k8s deployment for {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
278 ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
279 replicas = kwargs.get("replicas", 1)
280 resource_config = _get_resources(**kwargs)
281 _, dep = k8sclient.deploy(DCAE_NAMESPACE,
285 always_pull=kwargs.get("always_pull_image", False),
286 k8sconfig=plugin_conf,
287 resources=resource_config,
288 volumes=kwargs.get("volumes", []),
289 ports=kwargs.get("ports", []),
290 msb_list=kwargs.get("msb_list"),
291 tls_info=kwargs.get("tls_info"),
293 labels=kwargs.get("labels", {}),
294 log_info=kwargs.get("log_info"),
295 readiness=kwargs.get("readiness"),
296 liveness=kwargs.get("liveness"),
297 k8s_location=kwargs.get("k8s_location"))
299 # Capture the result of deployment for future use
300 ctx.instance.runtime_properties[K8S_DEPLOYMENT] = dep
301 kwargs[K8S_DEPLOYMENT] = dep
302 ctx.instance.runtime_properties["replicas"] = replicas
303 ctx.logger.info ("k8s deployment initiated successfully for {0}: {1}".format(container_name, dep))
306 def _parse_cloudify_context(**kwargs):
307 """Parse Cloudify context
309 Extract what is needed. This is impure function because it requires ctx.
311 kwargs["deployment_id"] = ctx.deployment.id
313 # Set some labels for the Kubernetes pods
314 # The name segment is required and must be 63 characters or less
316 "cfydeployment" : ctx.deployment.id,
317 "cfynode": ctx.node.name[:63],
318 "cfynodeinstance": ctx.instance.id[:63]
321 # Pick up the centralized logging info
322 if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
323 kwargs["log_info"] = ctx.node.properties["log_info"]
325 # Pick up TLS info if present
326 if "tls_info" in ctx.node.properties:
327 kwargs["tls_info"] = ctx.node.properties["tls_info"]
329 # Pick up replica count and always_pull_image flag
330 if "replicas" in ctx.node.properties:
331 kwargs["replicas"] = ctx.node.properties["replicas"]
332 if "always_pull_image" in ctx.node.properties:
333 kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
336 kwargs["k8s_location"] = _get_location()
340 def _enhance_docker_params(**kwargs):
342 Set up Docker environment variables and readiness/liveness check info
343 and inject into kwargs.
346 # Get info for setting up readiness/liveness probe, if present
347 docker_config = kwargs.get("docker_config", {})
348 if "healthcheck" in docker_config:
349 kwargs["readiness"] = docker_config["healthcheck"]
350 if "livehealthcheck" in docker_config:
351 kwargs["liveness"] = docker_config["livehealthcheck"]
353 envs = kwargs.get("envs", {})
355 # Set tags on this component for its Consul registration as a service
356 tags = [kwargs.get("deployment_id", None), kwargs["service_id"]]
357 tags = [ str(tag) for tag in tags if tag is not None ]
358 # Registrator will use this to register this component with tags. Must be
360 envs["SERVICE_TAGS"] = ",".join(tags)
362 kwargs["envs"] = envs
364 def combine_params(key, docker_config, kwargs):
365 v = docker_config.get(key, []) + kwargs.get(key, [])
369 # Add the lists of ports and volumes unintelligently - meaning just add the
370 # lists together with no deduping.
371 kwargs = combine_params("ports", docker_config, kwargs)
372 kwargs = combine_params("volumes", docker_config, kwargs)
377 def _create_and_start_component(**kwargs):
378 """Create and start component (container)"""
379 image = kwargs["image"]
380 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
381 # Need to be picky and manually select out pieces because just using kwargs
382 # which contains everything confused the execution of
383 # _create_and_start_container because duplicate variables exist
385 "volumes": kwargs.get("volumes", []),
386 "ports": kwargs.get("ports", None),
387 "envs": kwargs.get("envs", {}),
388 "log_info": kwargs.get("log_info", {}),
389 "tls_info": kwargs.get("tls_info", {}),
390 "labels": kwargs.get("labels", {}),
391 "resource_config": kwargs.get("resource_config",{}),
392 "readiness": kwargs.get("readiness",{}),
393 "liveness": kwargs.get("liveness",{}),
394 "k8s_location": kwargs.get("k8s_location")}
395 returned_args = _create_and_start_container(service_component_name, image, **sub_kwargs)
396 kwargs[K8S_DEPLOYMENT] = returned_args[K8S_DEPLOYMENT]
400 def _verify_component(**kwargs):
401 """Verify deployment is ready"""
402 service_component_name = kwargs[SERVICE_COMPONENT_NAME]
404 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
405 ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
407 if _verify_k8s_deployment(kwargs.get("k8s_location"), service_component_name, max_wait):
408 ctx.logger.info("k8s deployment is ready for: {0}".format(service_component_name))
410 # The component did not become ready within the "max_wait" interval.
411 # Delete the k8s components created already and remove configuration from Consul.
412 ctx.logger.error("k8s deployment never became ready for {0}".format(service_component_name))
413 if (K8S_DEPLOYMENT in kwargs) and (len(kwargs[K8S_DEPLOYMENT]["deployment"]) > 0):
414 ctx.logger.info("attempting to delete k8s artifacts: {0}".format(kwargs[K8S_DEPLOYMENT]))
415 k8sclient.undeploy(kwargs[K8S_DEPLOYMENT])
416 ctx.logger.info("deleted k8s artifacts: {0}".format(kwargs[K8S_DEPLOYMENT]))
417 cleanup_discovery(**kwargs)
418 raise DockerPluginDeploymentError("k8s deployment never became ready for {0}".format(service_component_name))
422 def _done_for_start(**kwargs):
423 ctx.instance.runtime_properties.update(kwargs)
424 ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
427 def _setup_msb_registration(service_name, msb_reg):
429 "serviceName" : service_name,
430 "port" : msb_reg.get("port", "80"),
431 "version" : msb_reg.get("version", "v1"),
432 "url" : msb_reg.get("url_path", "/v1"),
434 "enable_ssl" : msb_reg.get("uses_ssl", False),
438 @wrap_error_handling_start
439 @merge_inputs_for_start
442 def create_and_start_container_for_components(**start_inputs):
443 """Initiate Kubernetes deployment for service components
445 This operation method is to be used with the ContainerizedServiceComponent
446 node type. After initiating a Kubernetes deployment, the plugin will verify with Kubernetes
447 that the app is up and responding successfully to readiness probes.
451 **_create_and_start_component(
452 **_parse_cloudify_context(**start_inputs))))
454 @wrap_error_handling_start
457 def create_and_start_container_for_platforms(**kwargs):
458 """Initiate Kubernetes deployment for platform components
460 This operation method is to be used with the ContainerizedPlatformComponent
463 # Capture node properties
464 image = ctx.node.properties["image"]
465 docker_config = ctx.node.properties.get("docker_config", {})
466 resource_config = ctx.node.properties.get("resource_config", {})
467 kwargs["resource_config"] = resource_config
468 if "healthcheck" in docker_config:
469 kwargs["readiness"] = docker_config["healthcheck"]
470 if "livehealthcheck" in docker_config:
471 kwargs["liveness"] = docker_config["livehealthcheck"]
472 if "dns_name" in ctx.node.properties:
473 service_component_name = ctx.node.properties["dns_name"]
475 service_component_name = ctx.node.properties["name"]
477 # Set some labels for the Kubernetes pods
478 # The name segment is required and must be 63 characters or less
480 "cfydeployment" : ctx.deployment.id,
481 "cfynode": ctx.node.name[:63],
482 "cfynodeinstance": ctx.instance.id[:63]
485 host_port = ctx.node.properties["host_port"]
486 container_port = ctx.node.properties["container_port"]
488 # Cloudify properties are all required and Cloudify complains that None
489 # is not a valid type for integer. Defaulting to 0 to indicate to not
490 # use this and not to set a specific port mapping in cases like service
492 if container_port != 0:
493 # Doing this because other nodes might want to use this property
494 port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port)
495 ports = kwargs.get("ports", []) + [ port_mapping ]
496 kwargs["ports"] = ports
497 if "ports" not in kwargs:
498 ctx.logger.warn("No port mappings defined. Will randomly assign port.")
500 # All of the new node properties could be handled more DRYly!
501 # If a registration to MSB is required, then set up the registration info
502 if "msb_registration" in ctx.node.properties and "port" in ctx.node.properties["msb_registration"]:
503 kwargs["msb_list"] = [_setup_msb_registration(service_component_name, ctx.node.properties["msb_registration"])]
505 # If centralized logging via ELK is desired, then set up the logging info
506 if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
507 kwargs["log_info"] = ctx.node.properties["log_info"]
509 # Pick up TLS info if present
510 if "tls_info" in ctx.node.properties:
511 kwargs["tls_info"] = ctx.node.properties["tls_info"]
513 # Pick up replica count and always_pull_image flag
514 if "replicas" in ctx.node.properties:
515 kwargs["replicas"] = ctx.node.properties["replicas"]
516 if "always_pull_image" in ctx.node.properties:
517 kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
520 kwargs["k8s_location"] = _get_location()
522 returned_args = _create_and_start_container(service_component_name, image, **kwargs)
524 # Verify that the k8s deployment is ready
525 # - Set service component name into kwargs
526 # - max_wait is already in kwargs if it was set
527 returned_args[SERVICE_COMPONENT_NAME] = service_component_name
528 _verify_component(**returned_args)
530 @wrap_error_handling_start
533 def create_and_start_container(**kwargs):
534 """Initiate a Kubernetes deployment for the generic ContainerizedApplication node type"""
535 service_component_name = ctx.node.properties["name"]
536 ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
538 image = ctx.node.properties["image"]
539 kwargs["k8s_location"] = _get_location()
541 _create_and_start_container(service_component_name, image,**kwargs)
545 def stop_and_remove_container(**kwargs):
546 """Delete Kubernetes deployment"""
547 if K8S_DEPLOYMENT in ctx.instance.runtime_properties:
549 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
550 k8sclient.undeploy(deployment_description)
552 except Exception as e:
553 ctx.logger.error("Unexpected error while deleting k8s deployment: {0}"
556 # A previous install workflow may have failed,
557 # and no Kubernetes deployment info was recorded in runtime_properties.
558 # No need to run the undeploy operation
559 ctx.logger.info("No k8s deployment information, not attempting to delete k8s deployment")
561 @wrap_error_handling_update
564 def scale(replicas, **kwargs):
565 """Change number of replicas in the deployment"""
566 service_component_name = ctx.instance.runtime_properties["service_component_name"]
569 current_replicas = ctx.instance.runtime_properties["replicas"]
570 ctx.logger.info("Scaling {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
571 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
572 k8sclient.scale(deployment_description, replicas)
573 ctx.instance.runtime_properties["replicas"] = replicas
575 # Verify that the scaling took place as expected
576 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
577 ctx.logger.info("Waiting up to {0} secs for {1} to scale and become ready".format(max_wait, service_component_name))
578 if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait):
579 ctx.logger.info("Scaling complete: {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
582 ctx.logger.info("Ignoring request to scale {0} to zero replicas".format(service_component_name))
584 @wrap_error_handling_update
587 def update_image(image, **kwargs):
588 """ Restart component with a new Docker image """
590 service_component_name = ctx.instance.runtime_properties["service_component_name"]
592 current_image = ctx.instance.runtime_properties["image"]
593 ctx.logger.info("Updating app image for {0} from {1} to {2}".format(service_component_name, current_image, image))
594 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
595 k8sclient.upgrade(deployment_description, image)
596 ctx.instance.runtime_properties["image"] = image
598 # Verify that the update took place as expected
599 max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
600 ctx.logger.info("Waiting up to {0} secs for {1} to be updated and become ready".format(max_wait, service_component_name))
601 if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait):
602 ctx.logger.info("Update complete: {0} from {1} to {2}".format(service_component_name, current_image, image))
605 ctx.logger.info("Ignoring update_image request for {0} with unusable image '{1}'".format(service_component_name, str(image)))
607 #TODO: implement rollback operation when kubernetes python client fix is available.
608 # (See comments in k8sclient.py.)
609 # In the meantime, it's possible to undo an update_image operation by doing a second
610 # update_image that specifies the older image.
613 @Policies.cleanup_policies_on_node
615 def cleanup_discovery(**kwargs):
616 """Delete configuration from Consul"""
617 if SERVICE_COMPONENT_NAME in ctx.instance.runtime_properties:
618 service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
621 conn = dis.create_kv_conn(CONSUL_HOST)
622 dis.remove_service_component_config(conn, service_component_name)
623 except dis.DiscoveryConnectionError as e:
624 raise RecoverableError(e)
626 # When another node in the blueprint fails install,
627 # this node may not have generated a service component name.
628 # There's nothing to delete from Consul.
629 ctx.logger.info ("No service_component_name, not attempting to delete config from Consul")
631 def _notify_container(**kwargs):
633 Notify container using the policy section in the docker_config.
634 Notification consists of running a script in the application container
635 in each pod in the Kubernetes deployment associated with this node.
636 Return the list of notification results.
638 dc = kwargs["docker_config"]
641 if "policy" in dc and dc["policy"].get("trigger_type") == "docker":
642 # Build the command to execute in the container
643 # SCRIPT_PATH policies {"policies" : ...., "updated_policies" : ..., "removed_policies": ...}
644 script_path = dc["policy"]["script_path"]
646 "policies": kwargs["policies"],
647 "updated_policies": kwargs["updated_policies"],
648 "removed_policies": kwargs["removed_policies"]
651 command = [script_path, "policies", json.dumps(policy_data)]
653 # Execute the command
654 deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
655 resp = k8sclient.execute_command_in_deployment(deployment_description, command)
657 # else the default is no trigger
663 @Policies.update_policies_on_node()
664 def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
665 """Policy update task
667 This method is responsible for updating the application configuration and
668 notifying the applications that the change has occurred. This is to be used
669 for the dcae.interfaces.policy.policy_update operation.
671 :updated_policies: contains the list of changed policy-configs when configs_only=True
672 (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
674 service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
675 ctx.logger.info("policy_update for {0}-- updated_policies: {1}, removed_policies: {2}, policies: {3}"
676 .format(service_component_name, updated_policies, removed_policies, policies))
677 update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
678 update_inputs["updated_policies"] = updated_policies
679 update_inputs["removed_policies"] = removed_policies
680 update_inputs["policies"] = policies
682 resp = _notify_container(**update_inputs)
683 ctx.logger.info("policy_update complete for {0}--notification results: {1}".format(service_component_name,json.dumps(resp)))