50087fb64205802c05618a055436a0f826209241
[dcaegen2/platform/plugins.git] / k8s / k8splugin / tasks.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
18 #
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
20
21 # Lifecycle interface calls for containerized components
22
23 # Needed by Cloudify Manager to load google.auth for the Kubernetes python client
24 import cloudify_importer
25
26 import time, copy
27 from cloudify import ctx
28 from cloudify.decorators import operation
29 from cloudify.exceptions import NonRecoverableError, RecoverableError
30 from onap_dcae_dcaepolicy_lib import Policies
31 from k8splugin import discovery as dis
32 from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
33     merge_inputs_for_start, merge_inputs_for_create
34 from k8splugin.exceptions import DockerPluginDeploymentError
35 from k8splugin import utils
36 from configure import configure
37 from k8sclient import k8sclient
38
39 # Get configuration
40 plugin_conf = configure.configure()
41 CONSUL_HOST = plugin_conf.get("consul_host")
42 CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
43 DCAE_NAMESPACE = plugin_conf.get("namespace")
44
45 # Used to construct delivery urls for data router subscribers. Data router in FTL
46 # requires https but this author believes that ONAP is to be defaulted to http.
47 DEFAULT_SCHEME = "http"
48
49 # Property keys
50 SERVICE_COMPONENT_NAME = "service_component_name"
51 CONTAINER_ID = "container_id"
52 APPLICATION_CONFIG = "application_config"
53
54
55
56 # Utility methods
57
58 # Lifecycle interface calls for dcae.nodes.DockerContainer
59
60 def _setup_for_discovery(**kwargs):
61     """Setup for config discovery"""
62     try:
63         name = kwargs['name']
64         application_config = kwargs[APPLICATION_CONFIG]
65
66         # NOTE: application_config is no longer a json string and is inputed as a
67         # YAML map which translates to a dict. We don't have to do any
68         # preprocessing anymore.
69         conn = dis.create_kv_conn(CONSUL_HOST)
70         dis.push_service_component_config(conn, name, application_config)
71         return kwargs
72     except dis.DiscoveryConnectionError as e:
73         raise RecoverableError(e)
74     except Exception as e:
75         ctx.logger.error("Unexpected error while pushing configuration: {0}"
76                 .format(str(e)))
77         raise NonRecoverableError(e)
78
79 def _generate_component_name(**kwargs):
80     """Generate component name"""
81     service_component_type = kwargs['service_component_type']
82     name_override = kwargs['service_component_name_override']
83
84     kwargs['name'] = name_override if name_override \
85             else dis.generate_service_component_name(service_component_type)
86     return kwargs
87
88 def _done_for_create(**kwargs):
89     """Wrap up create operation"""
90     name = kwargs['name']
91     kwargs[SERVICE_COMPONENT_NAME] = name
92     # All updates to the runtime_properties happens here. I don't see a reason
93     # why we shouldn't do this because the context is not being mutated by
94     # something else and will keep the other functions pure (pure in the sense
95     # not dealing with CloudifyContext).
96     ctx.instance.runtime_properties.update(kwargs)
97     ctx.logger.info("Done setting up: {0}".format(name))
98     return kwargs
99
100
101 @merge_inputs_for_create
102 @monkeypatch_loggers
103 @Policies.gather_policies_to_node()
104 @operation
105 def create_for_components(**create_inputs):
106     """Create step for Docker containers that are components
107
108     This interface is responsible for:
109
110     1. Generating service component name
111     2. Populating config information into Consul
112     """
113     _done_for_create(
114             **_setup_for_discovery(
115                 **_generate_component_name(
116                     **create_inputs)))
117
118
119 def _parse_streams(**kwargs):
120     """Parse streams and setup for DMaaP plugin"""
121     # The DMaaP plugin requires this plugin to set the runtime properties
122     # keyed by the node name.
123     def setup_publishes(s):
124         kwargs[s["name"]] = s
125
126     map(setup_publishes, kwargs["streams_publishes"])
127
128     def setup_subscribes(s):
129         if s["type"] == "data_router":
130             # If username and password has been provided then generate it. The
131             # DMaaP plugin doesn't generate for subscribers. The generation code
132             # and length of username password has been lifted from the DMaaP
133             # plugin.
134
135             # Don't want to mutate the source
136             s = copy.deepcopy(s)
137             if not s.get("username", None):
138                 s["username"] = utils.random_string(8)
139             if not s.get("password", None):
140                 s["password"] = utils.random_string(10)
141
142         kwargs[s["name"]] = s
143
144     # NOTE: That the delivery url is constructed and setup in the start operation
145     map(setup_subscribes, kwargs["streams_subscribes"])
146
147     return kwargs
148
149 def _setup_for_discovery_streams(**kwargs):
150     """Setup for discovery of streams
151
152     Specifically, there's a race condition this call addresses for data router
153     subscriber case. The component needs its feed subscriber information but the
154     DMaaP plugin doesn't provide this until after the docker plugin start
155     operation.
156     """
157     dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
158             if s["type"] == "data_router"]
159
160     if dr_subs:
161         dmaap_kv_key = "{0}:dmaap".format(kwargs["name"])
162         conn = dis.create_kv_conn(CONSUL_HOST)
163
164         def add_feed(dr_sub):
165             # delivery url and subscriber id will be fill by the dmaap plugin later
166             v = { "location": dr_sub["location"], "delivery_url": None,
167                     "username": dr_sub["username"], "password": dr_sub["password"],
168                     "subscriber_id": None }
169             return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None
170
171         try:
172             if all(map(add_feed, dr_subs)):
173                 return kwargs
174         except Exception as e:
175             raise NonRecoverableError(e)
176
177         # You should never get here
178         raise NonRecoverableError("Failure updating feed streams in Consul")
179     else:
180         return kwargs
181
182
183 @merge_inputs_for_create
184 @monkeypatch_loggers
185 @Policies.gather_policies_to_node()
186 @operation
187 def create_for_components_with_streams(**create_inputs):
188     """Create step for Docker containers that are components that use DMaaP
189
190     This interface is responsible for:
191
192     1. Generating service component name
193     2. Setup runtime properties for DMaaP plugin
194     3. Populating application config into Consul
195     4. Populating DMaaP config for data router subscribers in Consul
196     """
197     _done_for_create(
198             **_setup_for_discovery(
199                 **_setup_for_discovery_streams(
200                     **_parse_streams(
201                         **_generate_component_name(
202                             **create_inputs)))))
203
204
205 @merge_inputs_for_create
206 @monkeypatch_loggers
207 @operation
208 def create_for_platforms(**create_inputs):
209     """Create step for Docker containers that are platform components
210
211     This interface is responible for:
212
213     1. Populating config information into Consul
214     """
215     _done_for_create(
216             **_setup_for_discovery(
217                 **create_inputs))
218
219
220 def _lookup_service(service_component_name, consul_host=CONSUL_HOST,
221         with_port=False):
222     conn = dis.create_kv_conn(consul_host)
223     results = dis.lookup_service(conn, service_component_name)
224
225     if with_port:
226         # Just grab first
227         result = results[0]
228         return "{address}:{port}".format(address=result["ServiceAddress"],
229                 port=result["ServicePort"])
230     else:
231         return results[0]["ServiceAddress"]
232
233
234 def _verify_container(service_component_name, max_wait):
235     """Verify that the container is healthy
236
237     Args:
238     -----
239     max_wait (integer): limit to how may attempts to make which translates to
240         seconds because each sleep is one second. 0 means infinite.
241
242     Return:
243     -------
244     True if component is healthy else a DockerPluginDeploymentError exception
245     will be raised.
246     """
247     num_attempts = 1
248     
249     while True:
250         if k8sclient.is_available(DCAE_NAMESPACE, service_component_name):
251             return True
252         else:
253             num_attempts += 1
254
255             if max_wait > 0 and max_wait < num_attempts:
256                 raise DockerPluginDeploymentError("Container never became healthy")
257
258             time.sleep(1)
259   
260     return True
261
262 def _create_and_start_container(container_name, image, **kwargs):
263     '''
264     This will create a k8s Deployment and, if needed, a k8s Service or two.
265     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
266     We're not exposing k8s to the component developer and the blueprint author.
267     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
268     the details from the component developer and the blueprint author.)
269     
270     kwargs may have:
271         - volumes:  array of volume objects, where a volume object is:
272             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
273         - ports: array of strings in the form "container_port:host_port"
274         - envs: map of name-value pairs ( {name0: value0, name1: value1...} )
275         - always_pull: boolean.  If true, sets image pull policy to "Always"
276           so that a fresh copy of the image is always pull.  Otherwise, sets
277           image pull policy to "IfNotPresent"
278         - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
279         - log_info: an object with info for setting up ELK logging, with the form:
280             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}"
281         - replicas: number of replicas to be launched initially
282         - readiness: object with information needed to create a readiness check
283     '''
284     env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
285             "CONFIG_BINDING_SERVICE": "config-binding-service" }
286     env.update(kwargs.get("envs", {}))
287     ctx.logger.info("Deploying {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
288     ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
289     replicas = kwargs.get("replicas", 1)
290     _,dep = k8sclient.deploy(DCAE_NAMESPACE, 
291                      container_name, 
292                      image,
293                      replicas = replicas, 
294                      always_pull=kwargs.get("always_pull_image", False),
295                      k8sconfig=plugin_conf,
296                      volumes=kwargs.get("volumes",[]), 
297                      ports=kwargs.get("ports",[]),
298                      msb_list=kwargs.get("msb_list"), 
299                      env = env,
300                      labels = kwargs.get("labels", {}),
301                      log_info=kwargs.get("log_info"),
302                      readiness=kwargs.get("readiness"))
303
304     # Capture the result of deployment for future use 
305     ctx.instance.runtime_properties["k8s_deployment"] = dep
306     ctx.instance.runtime_properties["replicas"] = replicas
307     ctx.logger.info ("Deployment complete: {0}".format(dep))
308
309 def _parse_cloudify_context(**kwargs):
310     """Parse Cloudify context
311
312     Extract what is needed. This is impure function because it requires ctx.
313     """
314     kwargs["deployment_id"] = ctx.deployment.id
315
316     # Set some labels for the Kubernetes pods
317     kwargs["labels"] = {
318         "cfydeployment" : ctx.deployment.id,
319         "cfynode": ctx.node.name,
320         "cfynodeinstance": ctx.instance.id
321     }
322
323         # Pick up the centralized logging info
324     if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
325         kwargs["log_info"] = ctx.node.properties["log_info"]
326
327     # Pick up replica count and always_pull_image flag
328     if "replicas" in ctx.node.properties:
329         kwargs["replicas"] = ctx.node.properties["replicas"]
330     if "always_pull_image" in ctx.node.properties:
331         kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
332
333     return kwargs
334
335 def _enhance_docker_params(**kwargs):
336     '''
337     Set up Docker environment variables and readiness check info
338     and inject into kwargs.
339     '''
340     
341     # Get info for setting up readiness probe, if present
342     docker_config = kwargs.get("docker_config", {})
343     if "healthcheck" in docker_config:
344         kwargs["readiness"] = docker_config["healthcheck"] 
345
346     envs = kwargs.get("envs", {})
347
348     # Set tags on this component for its Consul registration as a service
349     tags = [kwargs.get("deployment_id", None), kwargs["service_id"]]
350     tags = [ str(tag) for tag in tags if tag is not None ]
351     # Registrator will use this to register this component with tags. Must be
352     # comma delimited.
353     envs["SERVICE_TAGS"] = ",".join(tags)
354
355     kwargs["envs"] = envs
356
357     def combine_params(key, docker_config, kwargs):
358         v = docker_config.get(key, []) + kwargs.get(key, [])
359         if v:
360             kwargs[key] = v
361         return kwargs
362
363     # Add the lists of ports and volumes unintelligently - meaning just add the
364     # lists together with no deduping.
365     kwargs = combine_params("ports", docker_config, kwargs)
366     kwargs = combine_params("volumes", docker_config, kwargs)
367     
368
369     return kwargs
370
371 def _create_and_start_component(**kwargs):
372     """Create and start component (container)"""
373     image = kwargs["image"]
374     service_component_name = kwargs[SERVICE_COMPONENT_NAME]
375     # Need to be picky and manually select out pieces because just using kwargs
376     # which contains everything confused the execution of
377     # _create_and_start_container because duplicate variables exist
378     sub_kwargs = { 
379         "volumes": kwargs.get("volumes", []),
380         "ports": kwargs.get("ports", None),
381         "envs": kwargs.get("envs", {}), 
382         "log_info": kwargs.get("log_info", {}),
383         "labels": kwargs.get("labels", {}),
384         "readiness": kwargs.get("readiness",{})}
385     _create_and_start_container(service_component_name, image, **sub_kwargs)
386    
387     # TODO: Use regular logging here
388     ctx.logger.info("Container started: {0}".format(service_component_name))
389
390     return kwargs
391
392 def _verify_component(**kwargs):
393     """Verify component (container) is healthy"""
394     service_component_name = kwargs[SERVICE_COMPONENT_NAME]
395     # TODO: "Consul doesn't make its first health check immediately upon registration.
396     # Instead it waits for the health check interval to pass."
397     # Possible enhancement is to read the interval (and possibly the timeout) from
398     # docker_config and multiply that by a number to come up with a more suitable
399     # max_wait.
400     
401     max_wait = kwargs.get("max_wait", 300)
402
403     # Verify that the container is healthy
404
405     if _verify_container(service_component_name, max_wait):
406         service_component_name = kwargs[SERVICE_COMPONENT_NAME]
407
408         # TODO: Use regular logging here
409         ctx.logger.info("Container is healthy: {0}".format(service_component_name))
410         
411     return kwargs
412
413 def _done_for_start(**kwargs):
414     ctx.instance.runtime_properties.update(kwargs)
415     ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
416     return kwargs
417
418 def _setup_msb_registration(service_name, msb_reg):
419     return {
420         "serviceName" : service_name,
421         "port" : msb_reg.get("port", "80"),
422         "version" : msb_reg.get("version", "v1"),
423         "url" : msb_reg.get("url_path", "/v1"),
424         "protocol" : "REST",
425         "enable_ssl" : msb_reg.get("uses_ssl", False),
426         "visualRange" : "1"
427 }
428
429 @wrap_error_handling_start
430 @merge_inputs_for_start
431 @monkeypatch_loggers
432 @operation
433 def create_and_start_container_for_components(**start_inputs):
434     """Create Docker container and start for components
435
436     This operation method is to be used with the DockerContainerForComponents
437     node type. After launching the container, the plugin will verify with Consul
438     that the app is up and healthy before terminating.
439     """
440     _done_for_start(
441             **_verify_component(
442                 **_create_and_start_component(
443                     **_enhance_docker_params(
444                         **_parse_cloudify_context(**start_inputs)))))
445
446
447 def _update_delivery_url(**kwargs):
448     """Update the delivery url for data router subscribers"""
449     dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
450             if s["type"] == "data_router"]
451
452     if dr_subs:
453         service_component_name = kwargs[SERVICE_COMPONENT_NAME]
454         # TODO: Should NOT be setting up the delivery url with ip addresses
455         # because in the https case, this will not work because data router does
456         # a certificate validation using the fqdn.
457         subscriber_host = _lookup_service(service_component_name, with_port=True)
458
459         for dr_sub in dr_subs:
460             scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME
461             if "route" not in dr_sub:
462                 raise NonRecoverableError("'route' key missing from data router subscriber")
463             path = dr_sub["route"]
464             dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format(
465                     scheme=scheme, host=subscriber_host, path=path)
466             kwargs[dr_sub["name"]] = dr_sub
467
468     return kwargs
469
470 @wrap_error_handling_start
471 @merge_inputs_for_start
472 @monkeypatch_loggers
473 @operation
474 def create_and_start_container_for_components_with_streams(**start_inputs):
475     """Create Docker container and start for components that have streams
476
477     This operation method is to be used with the DockerContainerForComponents
478     node type. After launching the container, the plugin will verify with Consul
479     that the app is up and healthy before terminating.
480     """
481     _done_for_start(
482             **_update_delivery_url(
483                 **_verify_component(
484                     **_create_and_start_component(
485                         **_enhance_docker_params(
486                             **_parse_cloudify_context(**start_inputs))))))
487
488
489 @wrap_error_handling_start
490 @monkeypatch_loggers
491 @operation
492 def create_and_start_container_for_platforms(**kwargs):
493     """Create Docker container and start for platform services
494
495     This operation method is to be used with the ContainerizedPlatformComponent
496     node type.
497     """
498     # Capture node properties
499     image = ctx.node.properties["image"]
500     docker_config = ctx.node.properties.get("docker_config", {})
501     if "healthcheck" in docker_config:
502         kwargs["readiness"] = docker_config["healthcheck"] 
503     if "dns_name" in ctx.node.properties:
504         service_component_name = ctx.node.properties["dns_name"]
505     else:
506         service_component_name = ctx.node.properties["name"]
507
508     # Set some labels for the Kubernetes pods
509     kwargs["labels"] = {
510         "cfydeployment" : ctx.deployment.id,
511         "cfynode": ctx.node.name,
512         "cfynodeinstance": ctx.instance.id
513     }
514
515     host_port = ctx.node.properties["host_port"]
516     container_port = ctx.node.properties["container_port"]
517
518     # Cloudify properties are all required and Cloudify complains that None
519     # is not a valid type for integer. Defaulting to 0 to indicate to not
520     # use this and not to set a specific port mapping in cases like service
521     # change handler.
522     if container_port != 0:
523         # Doing this because other nodes might want to use this property
524         port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port)
525         ports = kwargs.get("ports", []) + [ port_mapping ]
526         kwargs["ports"] = ports
527     if "ports" not in kwargs:
528         ctx.logger.warn("No port mappings defined. Will randomly assign port.")
529
530     # All of the new node properties could be handled more DRYly!
531     # If a registration to MSB is required, then set up the registration info
532     if "msb_registration" in ctx.node.properties and "port" in ctx.node.properties["msb_registration"]:
533         kwargs["msb_list"] = [_setup_msb_registration(service_component_name, ctx.node.properties["msb_registration"])]
534
535     # If centralized logging via ELK is desired, then set up the logging info
536     if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
537         kwargs["log_info"] = ctx.node.properties["log_info"]
538
539     # Pick up replica count and always_pull_image flag
540     if "replicas" in ctx.node.properties:
541         kwargs["replicas"] = ctx.node.properties["replicas"]
542     if "always_pull_image" in ctx.node.properties:
543         kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
544     _create_and_start_container(service_component_name, image, **kwargs)
545
546     ctx.logger.info("Container started: {0}".format(service_component_name))
547
548     # Verify that the container is healthy
549
550     max_wait = kwargs.get("max_wait", 300)
551
552     if _verify_container(service_component_name, max_wait):
553         ctx.logger.info("Container is healthy: {0}".format(service_component_name))
554
555
556 @wrap_error_handling_start
557 @monkeypatch_loggers
558 @operation
559 def create_and_start_container(**kwargs):
560     """Create Docker container and start"""
561     service_component_name = ctx.node.properties["name"]
562     ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
563
564     image = ctx.node.properties["image"]
565
566     _create_and_start_container(service_component_name, image,**kwargs)
567     
568     ctx.logger.info("Component deployed: {0}".format(service_component_name))
569
570
571 @monkeypatch_loggers
572 @operation
573 def stop_and_remove_container(**kwargs):
574     """Stop and remove Docker container"""
575     try:
576         deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
577         k8sclient.undeploy(deployment_description)
578
579     except Exception as e:
580         ctx.logger.error("Unexpected error while stopping container: {0}"
581                 .format(str(e)))
582
583 @monkeypatch_loggers
584 @operation
585 def scale(replicas, **kwargs):
586     """Change number of replicas in the deployment"""
587     if replicas > 0:
588         current_replicas = ctx.instance.runtime_properties["replicas"]
589         ctx.logger.info("Scaling from {0} to {1}".format(current_replicas, replicas))
590         try:
591             deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
592             k8sclient.scale(deployment_description, replicas)
593             ctx.instance.runtime_properties["replicas"] = replicas
594         except Exception as e:
595             ctx.logger.error ("Unexpected error while scaling {0}".format(str(e)))
596     else:
597         ctx.logger.info("Ignoring request to scale to zero replicas")
598         
599 @monkeypatch_loggers
600 @Policies.cleanup_policies_on_node
601 @operation
602 def cleanup_discovery(**kwargs):
603     """Delete configuration from Consul"""
604     service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
605
606     try:
607         conn = dis.create_kv_conn(CONSUL_HOST)
608         dis.remove_service_component_config(conn, service_component_name)
609     except dis.DiscoveryConnectionError as e:
610         raise RecoverableError(e)
611
612
613 def _notify_container(**kwargs):
614     """Notify container using the policy section in the docker_config"""
615     dc = kwargs["docker_config"]
616
617     if "policy" in dc:
618         if dc["policy"]["trigger_type"] == "docker":
619             pass
620             """
621             Need replacement for this in kubernetes.
622             Need to find all the pods that have been deployed
623             and execute the script in them.
624             Kubernetes does not appear to have a way to ask for a script
625             to be executed in all of the currently running pods for a
626             Kubernetes Deployment or ReplicaSet.   We will have to find
627             each of them and run the script.   The problem is that set of
628             pods could be changing.   We can query to get all the pods, but
629             there's no guarantee the list won't change while we're trying to
630             execute the script.
631             
632             In ONAP R2, all of the policy-driven components rely on polling.
633             """
634             """
635             # REVIEW: Need to finalize on the docker config policy data structure
636             script_path = dc["policy"]["script_path"]
637             updated_policies = kwargs["updated_policies"]
638             removed_policies = kwargs["removed_policies"]
639             policies = kwargs["policies"]
640             cmd = doc.build_policy_update_cmd(script_path, use_sh=False,
641                     msg_type="policies",
642                     updated_policies=updated_policies,
643                     removed_policies=removed_policies,
644                     policies=policies
645                     )
646
647             docker_host = kwargs[SELECTED_CONTAINER_DESTINATION]
648             docker_host_ip = _lookup_service(docker_host)
649             logins = _get_docker_logins()
650             client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins)
651
652             container_id = kwargs["container_id"]
653
654             doc.notify_for_policy_update(client, container_id, cmd)
655     """
656     # else the default is no trigger
657
658     return kwargs
659
660
661 @monkeypatch_loggers
662 @Policies.update_policies_on_node()
663 @operation
664 def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
665     """Policy update task
666
667     This method is responsible for updating the application configuration and
668     notifying the applications that the change has occurred. This is to be used
669     for the dcae.interfaces.policy.policy_update operation.
670
671     :updated_policies: contains the list of changed policy-configs when configs_only=True
672         (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
673     """
674     update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
675     update_inputs["updated_policies"] = updated_policies
676     update_inputs["removed_policies"] = removed_policies
677     update_inputs["policies"] = policies
678
679     _notify_container(**update_inputs)