Set up env variables correctly
[dcaegen2/platform/plugins.git] / k8s / k8splugin / tasks.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
18 #
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
20
21 # Lifecycle interface calls for containerized components
22
23 # Needed by Cloudify Manager to load google.auth for the Kubernetes python client
24 import cloudify_importer
25
26 import time, copy
27 from cloudify import ctx
28 from cloudify.decorators import operation
29 from cloudify.exceptions import NonRecoverableError, RecoverableError
30 import dockering as doc
31 from onap_dcae_dcaepolicy_lib import Policies
32 from k8splugin import discovery as dis
33 from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
34     merge_inputs_for_start, merge_inputs_for_create
35 from k8splugin.exceptions import DockerPluginDeploymentError
36 from k8splugin import utils
37 from configure import configure
38 from k8sclient import k8sclient
39
40 # Get configuration
41 plugin_conf = configure.configure()
42 CONSUL_HOST = plugin_conf.get("consul_host")
43 CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
44 DCAE_NAMESPACE = plugin_conf.get("namespace")
45
46 # Used to construct delivery urls for data router subscribers. Data router in FTL
47 # requires https but this author believes that ONAP is to be defaulted to http.
48 DEFAULT_SCHEME = "http"
49
50 # Property keys
51 SERVICE_COMPONENT_NAME = "service_component_name"
52 CONTAINER_ID = "container_id"
53 APPLICATION_CONFIG = "application_config"
54
55
56
57 # Utility methods
58
59 # Lifecycle interface calls for dcae.nodes.DockerContainer
60
61 def _setup_for_discovery(**kwargs):
62     """Setup for config discovery"""
63     try:
64         name = kwargs['name']
65         application_config = kwargs[APPLICATION_CONFIG]
66
67         # NOTE: application_config is no longer a json string and is inputed as a
68         # YAML map which translates to a dict. We don't have to do any
69         # preprocessing anymore.
70         conn = dis.create_kv_conn(CONSUL_HOST)
71         dis.push_service_component_config(conn, name, application_config)
72         return kwargs
73     except dis.DiscoveryConnectionError as e:
74         raise RecoverableError(e)
75     except Exception as e:
76         ctx.logger.error("Unexpected error while pushing configuration: {0}"
77                 .format(str(e)))
78         raise NonRecoverableError(e)
79
80 def _generate_component_name(**kwargs):
81     """Generate component name"""
82     service_component_type = kwargs['service_component_type']
83     name_override = kwargs['service_component_name_override']
84
85     kwargs['name'] = name_override if name_override \
86             else dis.generate_service_component_name(service_component_type)
87     return kwargs
88
89 def _done_for_create(**kwargs):
90     """Wrap up create operation"""
91     name = kwargs['name']
92     kwargs[SERVICE_COMPONENT_NAME] = name
93     # All updates to the runtime_properties happens here. I don't see a reason
94     # why we shouldn't do this because the context is not being mutated by
95     # something else and will keep the other functions pure (pure in the sense
96     # not dealing with CloudifyContext).
97     ctx.instance.runtime_properties.update(kwargs)
98     ctx.logger.info("Done setting up: {0}".format(name))
99     return kwargs
100
101
102 @merge_inputs_for_create
103 @monkeypatch_loggers
104 @Policies.gather_policies_to_node()
105 @operation
106 def create_for_components(**create_inputs):
107     """Create step for Docker containers that are components
108
109     This interface is responsible for:
110
111     1. Generating service component name
112     2. Populating config information into Consul
113     """
114     _done_for_create(
115             **_setup_for_discovery(
116                 **_generate_component_name(
117                     **create_inputs)))
118
119
120 def _parse_streams(**kwargs):
121     """Parse streams and setup for DMaaP plugin"""
122     # The DMaaP plugin requires this plugin to set the runtime properties
123     # keyed by the node name.
124     def setup_publishes(s):
125         kwargs[s["name"]] = s
126
127     map(setup_publishes, kwargs["streams_publishes"])
128
129     def setup_subscribes(s):
130         if s["type"] == "data_router":
131             # If username and password has been provided then generate it. The
132             # DMaaP plugin doesn't generate for subscribers. The generation code
133             # and length of username password has been lifted from the DMaaP
134             # plugin.
135
136             # Don't want to mutate the source
137             s = copy.deepcopy(s)
138             if not s.get("username", None):
139                 s["username"] = utils.random_string(8)
140             if not s.get("password", None):
141                 s["password"] = utils.random_string(10)
142
143         kwargs[s["name"]] = s
144
145     # NOTE: That the delivery url is constructed and setup in the start operation
146     map(setup_subscribes, kwargs["streams_subscribes"])
147
148     return kwargs
149
150 def _setup_for_discovery_streams(**kwargs):
151     """Setup for discovery of streams
152
153     Specifically, there's a race condition this call addresses for data router
154     subscriber case. The component needs its feed subscriber information but the
155     DMaaP plugin doesn't provide this until after the docker plugin start
156     operation.
157     """
158     dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
159             if s["type"] == "data_router"]
160
161     if dr_subs:
162         dmaap_kv_key = "{0}:dmaap".format(kwargs["name"])
163         conn = dis.create_kv_conn(CONSUL_HOST)
164
165         def add_feed(dr_sub):
166             # delivery url and subscriber id will be fill by the dmaap plugin later
167             v = { "location": dr_sub["location"], "delivery_url": None,
168                     "username": dr_sub["username"], "password": dr_sub["password"],
169                     "subscriber_id": None }
170             return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None
171
172         try:
173             if all(map(add_feed, dr_subs)):
174                 return kwargs
175         except Exception as e:
176             raise NonRecoverableError(e)
177
178         # You should never get here
179         raise NonRecoverableError("Failure updating feed streams in Consul")
180     else:
181         return kwargs
182
183
184 @merge_inputs_for_create
185 @monkeypatch_loggers
186 @Policies.gather_policies_to_node()
187 @operation
188 def create_for_components_with_streams(**create_inputs):
189     """Create step for Docker containers that are components that use DMaaP
190
191     This interface is responsible for:
192
193     1. Generating service component name
194     2. Setup runtime properties for DMaaP plugin
195     3. Populating application config into Consul
196     4. Populating DMaaP config for data router subscribers in Consul
197     """
198     _done_for_create(
199             **_setup_for_discovery(
200                 **_setup_for_discovery_streams(
201                     **_parse_streams(
202                         **_generate_component_name(
203                             **create_inputs)))))
204
205
206 @merge_inputs_for_create
207 @monkeypatch_loggers
208 @operation
209 def create_for_platforms(**create_inputs):
210     """Create step for Docker containers that are platform components
211
212     This interface is responible for:
213
214     1. Populating config information into Consul
215     """
216     _done_for_create(
217             **_setup_for_discovery(
218                 **create_inputs))
219
220
221 def _lookup_service(service_component_name, consul_host=CONSUL_HOST,
222         with_port=False):
223     conn = dis.create_kv_conn(consul_host)
224     results = dis.lookup_service(conn, service_component_name)
225
226     if with_port:
227         # Just grab first
228         result = results[0]
229         return "{address}:{port}".format(address=result["ServiceAddress"],
230                 port=result["ServicePort"])
231     else:
232         return results[0]["ServiceAddress"]
233
234
235 def _verify_container(service_component_name, max_wait):
236     """Verify that the container is healthy
237
238     Args:
239     -----
240     max_wait (integer): limit to how may attempts to make which translates to
241         seconds because each sleep is one second. 0 means infinite.
242
243     Return:
244     -------
245     True if component is healthy else a DockerPluginDeploymentError exception
246     will be raised.
247     """
248     num_attempts = 1
249     
250     while True:
251         if k8sclient.is_available(DCAE_NAMESPACE, service_component_name):
252             return True
253         else:
254             num_attempts += 1
255
256             if max_wait > 0 and max_wait < num_attempts:
257                 raise DockerPluginDeploymentError("Container never became healthy")
258
259             time.sleep(1)
260   
261     return True
262
263 def _create_and_start_container(container_name, image, **kwargs):
264     '''
265     This will create a k8s Deployment and, if needed, a k8s Service or two.
266     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
267     We're not exposing k8s to the component developer and the blueprint author.
268     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
269     the details from the component developer and the blueprint author.)
270     
271     kwargs may have:
272         - volumes:  array of volume objects, where a volume object is:
273             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
274         - ports: array of strings in the form "container_port:host_port"
275         - envs: map of name-value pairs ( {name0: value0, name1: value1...} )
276         - always_pull: boolean.  If true, sets image pull policy to "Always"
277           so that a fresh copy of the image is always pull.  Otherwise, sets
278           image pull policy to "IfNotPresent"
279         - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
280         - log_info: an object with info for setting up ELK logging, with the form:
281             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}"
282         - replicas: number of replicas to be launched initially
283     '''
284     env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
285             "CONFIG_BINDING_SERVICE": "config-binding-service" }
286     env.update(kwargs.get("envs", {}))
287     ctx.logger.info("Deploying {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
288     ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
289     replicas = kwargs.get("replicas", 1)
290     _,dep = k8sclient.deploy(DCAE_NAMESPACE, 
291                      container_name, 
292                      image,
293                      replicas = replicas, 
294                      always_pull=kwargs.get("always_pull_image", False),
295                      k8sconfig=plugin_conf,
296                      volumes=kwargs.get("volumes",[]), 
297                      ports=kwargs.get("ports",[]),
298                      msb_list=kwargs.get("msb_list"), 
299                      env = env,
300                      labels = kwargs.get("labels", {}),
301                      log_info=kwargs.get("log_info"))
302
303     # Capture the result of deployment for future use 
304     ctx.instance.runtime_properties["k8s_deployment"] = dep
305     ctx.instance.runtime_properties["replicas"] = replicas
306     ctx.logger.info ("Deployment complete: {0}".format(dep))
307
308 def _parse_cloudify_context(**kwargs):
309     """Parse Cloudify context
310
311     Extract what is needed. This is impure function because it requires ctx.
312     """
313     kwargs["deployment_id"] = ctx.deployment.id
314
315     # Set some labels for the Kubernetes pods
316     kwargs["labels"] = {
317         "cfydeployment" : ctx.deployment.id,
318         "cfynode": ctx.node.name,
319         "cfynodeinstance": ctx.instance.id
320     }
321
322         # Pick up the centralized logging info
323     if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
324         kwargs["log_info"] = ctx.node.properties["log_info"]
325
326     # Pick up replica count and always_pull_image flag
327     if "replicas" in ctx.node.properties:
328         kwargs["replicas"] = ctx.node.properties["replicas"]
329     if "always_pull_image" in ctx.node.properties:
330         kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
331
332     return kwargs
333
334 def _enhance_docker_params(**kwargs):
335     """Setup Docker envs"""
336     docker_config = kwargs.get("docker_config", {})
337
338     envs = kwargs.get("envs", {})
339     # NOTE: Healthchecks are optional until prepared to handle use cases that
340     # don't necessarily use http
341     envs_healthcheck = doc.create_envs_healthcheck(docker_config) \
342             if "healthcheck" in docker_config else {}
343     envs.update(envs_healthcheck)
344
345     # Set tags on this component for its Consul registration as a service
346     tags = [kwargs.get("deployment_id", None), kwargs["service_id"]]
347     tags = [ str(tag) for tag in tags if tag is not None ]
348     # Registrator will use this to register this component with tags. Must be
349     # comma delimited.
350     envs["SERVICE_TAGS"] = ",".join(tags)
351
352     kwargs["envs"] = envs
353
354     def combine_params(key, docker_config, kwargs):
355         v = docker_config.get(key, []) + kwargs.get(key, [])
356         if v:
357             kwargs[key] = v
358         return kwargs
359
360     # Add the lists of ports and volumes unintelligently - meaning just add the
361     # lists together with no deduping.
362     kwargs = combine_params("ports", docker_config, kwargs)
363     kwargs = combine_params("volumes", docker_config, kwargs)
364     
365
366     return kwargs
367
368 def _create_and_start_component(**kwargs):
369     """Create and start component (container)"""
370     image = kwargs["image"]
371     service_component_name = kwargs[SERVICE_COMPONENT_NAME]
372     # Need to be picky and manually select out pieces because just using kwargs
373     # which contains everything confused the execution of
374     # _create_and_start_container because duplicate variables exist
375     sub_kwargs = { 
376         "volumes": kwargs.get("volumes", []),
377         "ports": kwargs.get("ports", None),
378         "envs": kwargs.get("envs", {}), 
379         "log_info": kwargs.get("log_info", {}),
380         "labels": kwargs.get("labels", {})}
381     _create_and_start_container(service_component_name, image, **sub_kwargs)
382    
383     # TODO: Use regular logging here
384     ctx.logger.info("Container started: {0}".format(service_component_name))
385
386     return kwargs
387
388 def _verify_component(**kwargs):
389     """Verify component (container) is healthy"""
390     service_component_name = kwargs[SERVICE_COMPONENT_NAME]
391     # TODO: "Consul doesn't make its first health check immediately upon registration.
392     # Instead it waits for the health check interval to pass."
393     # Possible enhancement is to read the interval (and possibly the timeout) from
394     # docker_config and multiply that by a number to come up with a more suitable
395     # max_wait.
396     
397     max_wait = kwargs.get("max_wait", 300)
398
399     # Verify that the container is healthy
400
401     if _verify_container(service_component_name, max_wait):
402         service_component_name = kwargs[SERVICE_COMPONENT_NAME]
403
404         # TODO: Use regular logging here
405         ctx.logger.info("Container is healthy: {0}".format(service_component_name))
406         
407     return kwargs
408
409 def _done_for_start(**kwargs):
410     ctx.instance.runtime_properties.update(kwargs)
411     ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
412     return kwargs
413
414 def _setup_msb_registration(service_name, msb_reg):
415     return {
416         "serviceName" : service_name,
417         "port" : msb_reg.get("port", "80"),
418         "version" : msb_reg.get("version", "v1"),
419         "url" : msb_reg.get("url_path", "/v1"),
420         "protocol" : "REST",
421         "enable_ssl" : msb_reg.get("uses_ssl", False),
422         "visualRange" : "1"
423 }
424
425 @wrap_error_handling_start
426 @merge_inputs_for_start
427 @monkeypatch_loggers
428 @operation
429 def create_and_start_container_for_components(**start_inputs):
430     """Create Docker container and start for components
431
432     This operation method is to be used with the DockerContainerForComponents
433     node type. After launching the container, the plugin will verify with Consul
434     that the app is up and healthy before terminating.
435     """
436     _done_for_start(
437             **_verify_component(
438                 **_create_and_start_component(
439                     **_enhance_docker_params(
440                         **_parse_cloudify_context(**start_inputs)))))
441
442
443 def _update_delivery_url(**kwargs):
444     """Update the delivery url for data router subscribers"""
445     dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
446             if s["type"] == "data_router"]
447
448     if dr_subs:
449         service_component_name = kwargs[SERVICE_COMPONENT_NAME]
450         # TODO: Should NOT be setting up the delivery url with ip addresses
451         # because in the https case, this will not work because data router does
452         # a certificate validation using the fqdn.
453         subscriber_host = _lookup_service(service_component_name, with_port=True)
454
455         for dr_sub in dr_subs:
456             scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME
457             if "route" not in dr_sub:
458                 raise NonRecoverableError("'route' key missing from data router subscriber")
459             path = dr_sub["route"]
460             dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format(
461                     scheme=scheme, host=subscriber_host, path=path)
462             kwargs[dr_sub["name"]] = dr_sub
463
464     return kwargs
465
466 @wrap_error_handling_start
467 @merge_inputs_for_start
468 @monkeypatch_loggers
469 @operation
470 def create_and_start_container_for_components_with_streams(**start_inputs):
471     """Create Docker container and start for components that have streams
472
473     This operation method is to be used with the DockerContainerForComponents
474     node type. After launching the container, the plugin will verify with Consul
475     that the app is up and healthy before terminating.
476     """
477     _done_for_start(
478             **_update_delivery_url(
479                 **_verify_component(
480                     **_create_and_start_component(
481                         **_enhance_docker_params(
482                             **_parse_cloudify_context(**start_inputs))))))
483
484
485 @wrap_error_handling_start
486 @monkeypatch_loggers
487 @operation
488 def create_and_start_container_for_platforms(**kwargs):
489     """Create Docker container and start for platform services
490
491     This operation method is to be used with the ContainerizedPlatformComponent
492     node type.
493     """
494     # Capture node properties
495     image = ctx.node.properties["image"]
496     docker_config = ctx.node.properties.get("docker_config", {})
497     if "dns_name" in ctx.node.properties:
498         service_component_name = ctx.node.properties["dns_name"]
499     else:
500         service_component_name = ctx.node.properties["name"]
501
502
503     envs = kwargs.get("envs", {})
504     # NOTE: Healthchecks are optional until prepared to handle use cases that
505     # don't necessarily use http
506     envs_healthcheck = doc.create_envs_healthcheck(docker_config) \
507             if "healthcheck" in docker_config else {}
508     envs.update(envs_healthcheck)
509     kwargs["envs"] = envs
510
511     # Set some labels for the Kubernetes pods
512     kwargs["labels"] = {
513         "cfydeployment" : ctx.deployment.id,
514         "cfynode": ctx.node.name,
515         "cfynodeinstance": ctx.instance.id
516     }
517
518     host_port = ctx.node.properties["host_port"]
519     container_port = ctx.node.properties["container_port"]
520
521     # Cloudify properties are all required and Cloudify complains that None
522     # is not a valid type for integer. Defaulting to 0 to indicate to not
523     # use this and not to set a specific port mapping in cases like service
524     # change handler.
525     if container_port != 0:
526         # Doing this because other nodes might want to use this property
527         port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port)
528         ports = kwargs.get("ports", []) + [ port_mapping ]
529         kwargs["ports"] = ports
530     if "ports" not in kwargs:
531         ctx.logger.warn("No port mappings defined. Will randomly assign port.")
532
533     # All of the new node properties could be handled more DRYly!
534     # If a registration to MSB is required, then set up the registration info
535     if "msb_registration" in ctx.node.properties and "port" in ctx.node.properties["msb_registration"]:
536         kwargs["msb_list"] = [_setup_msb_registration(service_component_name, ctx.node.properties["msb_registration"])]
537
538     # If centralized logging via ELK is desired, then set up the logging info
539     if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
540         kwargs["log_info"] = ctx.node.properties["log_info"]
541
542     # Pick up replica count and always_pull_image flag
543     if "replicas" in ctx.node.properties:
544         kwargs["replicas"] = ctx.node.properties["replicas"]
545     if "always_pull_image" in ctx.node.properties:
546         kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
547     _create_and_start_container(service_component_name, image, **kwargs)
548
549     ctx.logger.info("Container started: {0}".format(service_component_name))
550
551     # Verify that the container is healthy
552
553     max_wait = kwargs.get("max_wait", 300)
554
555     if _verify_container(service_component_name, max_wait):
556         ctx.logger.info("Container is healthy: {0}".format(service_component_name))
557
558
559 @wrap_error_handling_start
560 @monkeypatch_loggers
561 @operation
562 def create_and_start_container(**kwargs):
563     """Create Docker container and start"""
564     service_component_name = ctx.node.properties["name"]
565     ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
566
567     image = ctx.node.properties["image"]
568
569     _create_and_start_container(service_component_name, image,**kwargs)
570     
571     ctx.logger.info("Component deployed: {0}".format(service_component_name))
572
573
574 @monkeypatch_loggers
575 @operation
576 def stop_and_remove_container(**kwargs):
577     """Stop and remove Docker container"""
578     try:
579         deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
580         k8sclient.undeploy(deployment_description)
581
582     except Exception as e:
583         ctx.logger.error("Unexpected error while stopping container: {0}"
584                 .format(str(e)))
585
586 @monkeypatch_loggers
587 @operation
588 def scale(replicas, **kwargs):
589     """Change number of replicas in the deployment"""
590     if replicas > 0:
591         current_replicas = ctx.instance.runtime_properties["replicas"]
592         ctx.logger.info("Scaling from {0} to {1}".format(current_replicas, replicas))
593         try:
594             deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
595             k8sclient.scale(deployment_description, replicas)
596             ctx.instance.runtime_properties["replicas"] = replicas
597         except Exception as e:
598             ctx.logger.error ("Unexpected error while scaling {0}".format(str(e)))
599     else:
600         ctx.logger.info("Ignoring request to scale to zero replicas")
601         
602 @monkeypatch_loggers
603 @Policies.cleanup_policies_on_node
604 @operation
605 def cleanup_discovery(**kwargs):
606     """Delete configuration from Consul"""
607     service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
608
609     try:
610         conn = dis.create_kv_conn(CONSUL_HOST)
611         dis.remove_service_component_config(conn, service_component_name)
612     except dis.DiscoveryConnectionError as e:
613         raise RecoverableError(e)
614
615
616 def _notify_container(**kwargs):
617     """Notify container using the policy section in the docker_config"""
618     dc = kwargs["docker_config"]
619
620     if "policy" in dc:
621         if dc["policy"]["trigger_type"] == "docker":
622             pass
623             """
624             Need replacement for this in kubernetes.
625             Need to find all the pods that have been deployed
626             and execute the script in them.
627             Kubernetes does not appear to have a way to ask for a script
628             to be executed in all of the currently running pods for a
629             Kubernetes Deployment or ReplicaSet.   We will have to find
630             each of them and run the script.   The problem is that set of
631             pods could be changing.   We can query to get all the pods, but
632             there's no guarantee the list won't change while we're trying to
633             execute the script.
634             
635             In ONAP R2, all of the policy-driven components rely on polling.
636             """
637             """
638             # REVIEW: Need to finalize on the docker config policy data structure
639             script_path = dc["policy"]["script_path"]
640             updated_policies = kwargs["updated_policies"]
641             removed_policies = kwargs["removed_policies"]
642             policies = kwargs["policies"]
643             cmd = doc.build_policy_update_cmd(script_path, use_sh=False,
644                     msg_type="policies",
645                     updated_policies=updated_policies,
646                     removed_policies=removed_policies,
647                     policies=policies
648                     )
649
650             docker_host = kwargs[SELECTED_CONTAINER_DESTINATION]
651             docker_host_ip = _lookup_service(docker_host)
652             logins = _get_docker_logins()
653             client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins)
654
655             container_id = kwargs["container_id"]
656
657             doc.notify_for_policy_update(client, container_id, cmd)
658     """
659     # else the default is no trigger
660
661     return kwargs
662
663
664 @monkeypatch_loggers
665 @Policies.update_policies_on_node()
666 @operation
667 def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
668     """Policy update task
669
670     This method is responsible for updating the application configuration and
671     notifying the applications that the change has occurred. This is to be used
672     for the dcae.interfaces.policy.policy_update operation.
673
674     :updated_policies: contains the list of changed policy-configs when configs_only=True
675         (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
676     """
677     update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
678     update_inputs["updated_policies"] = updated_policies
679     update_inputs["removed_policies"] = removed_policies
680     update_inputs["policies"] = policies
681
682     _notify_container(**update_inputs)