Add support for updating image.
[dcaegen2/platform/plugins.git] / k8s / k8splugin / tasks.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
18 #
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
20
21 # Lifecycle interface calls for containerized components
22
23 # Needed by Cloudify Manager to load google.auth for the Kubernetes python client
24 import cloudify_importer
25
26 import time, copy
27 from cloudify import ctx
28 from cloudify.decorators import operation
29 from cloudify.exceptions import NonRecoverableError, RecoverableError
30 from onap_dcae_dcaepolicy_lib import Policies
31 from k8splugin import discovery as dis
32 from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
33     merge_inputs_for_start, merge_inputs_for_create, wrap_error_handling_update
34 from k8splugin.exceptions import DockerPluginDeploymentError
35 from k8splugin import utils
36 from configure import configure
37 import k8sclient
38
39 # Get configuration
40 plugin_conf = configure.configure()
41 CONSUL_HOST = plugin_conf.get("consul_host")
42 CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
43 DCAE_NAMESPACE = plugin_conf.get("namespace")
44
45 # Used to construct delivery urls for data router subscribers. Data router in FTL
46 # requires https but this author believes that ONAP is to be defaulted to http.
47 DEFAULT_SCHEME = "http"
48
49 # Property keys
50 SERVICE_COMPONENT_NAME = "service_component_name"
51 CONTAINER_ID = "container_id"
52 APPLICATION_CONFIG = "application_config"
53
54
55
56 # Utility methods
57
58 # Lifecycle interface calls for dcae.nodes.DockerContainer
59
60 def _setup_for_discovery(**kwargs):
61     """Setup for config discovery"""
62     try:
63         name = kwargs['name']
64         application_config = kwargs[APPLICATION_CONFIG]
65
66         # NOTE: application_config is no longer a json string and is inputed as a
67         # YAML map which translates to a dict. We don't have to do any
68         # preprocessing anymore.
69         conn = dis.create_kv_conn(CONSUL_HOST)
70         dis.push_service_component_config(conn, name, application_config)
71         return kwargs
72     except dis.DiscoveryConnectionError as e:
73         raise RecoverableError(e)
74     except Exception as e:
75         ctx.logger.error("Unexpected error while pushing configuration: {0}"
76                 .format(str(e)))
77         raise NonRecoverableError(e)
78
79 def _generate_component_name(**kwargs):
80     """Generate component name"""
81     service_component_type = kwargs['service_component_type']
82     name_override = kwargs['service_component_name_override']
83
84     kwargs['name'] = name_override if name_override \
85             else dis.generate_service_component_name(service_component_type)
86     return kwargs
87
88 def _done_for_create(**kwargs):
89     """Wrap up create operation"""
90     name = kwargs['name']
91     kwargs[SERVICE_COMPONENT_NAME] = name
92     # All updates to the runtime_properties happens here. I don't see a reason
93     # why we shouldn't do this because the context is not being mutated by
94     # something else and will keep the other functions pure (pure in the sense
95     # not dealing with CloudifyContext).
96     ctx.instance.runtime_properties.update(kwargs)
97     ctx.logger.info("Done setting up: {0}".format(name))
98     return kwargs
99
100
101 @merge_inputs_for_create
102 @monkeypatch_loggers
103 @Policies.gather_policies_to_node()
104 @operation
105 def create_for_components(**create_inputs):
106     """Create step for Docker containers that are components
107
108     This interface is responsible for:
109
110     1. Generating service component name
111     2. Populating config information into Consul
112     """
113     _done_for_create(
114             **_setup_for_discovery(
115                 **_generate_component_name(
116                     **create_inputs)))
117
118
119 def _parse_streams(**kwargs):
120     """Parse streams and setup for DMaaP plugin"""
121     # The DMaaP plugin requires this plugin to set the runtime properties
122     # keyed by the node name.
123     def setup_publishes(s):
124         kwargs[s["name"]] = s
125
126     map(setup_publishes, kwargs["streams_publishes"])
127
128     def setup_subscribes(s):
129         if s["type"] == "data_router":
130             # If username and password has been provided then generate it. The
131             # DMaaP plugin doesn't generate for subscribers. The generation code
132             # and length of username password has been lifted from the DMaaP
133             # plugin.
134
135             # Don't want to mutate the source
136             s = copy.deepcopy(s)
137             if not s.get("username", None):
138                 s["username"] = utils.random_string(8)
139             if not s.get("password", None):
140                 s["password"] = utils.random_string(10)
141
142         kwargs[s["name"]] = s
143
144     # NOTE: That the delivery url is constructed and setup in the start operation
145     map(setup_subscribes, kwargs["streams_subscribes"])
146
147     return kwargs
148
149 def _setup_for_discovery_streams(**kwargs):
150     """Setup for discovery of streams
151
152     Specifically, there's a race condition this call addresses for data router
153     subscriber case. The component needs its feed subscriber information but the
154     DMaaP plugin doesn't provide this until after the docker plugin start
155     operation.
156     """
157     dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
158             if s["type"] == "data_router"]
159
160     if dr_subs:
161         dmaap_kv_key = "{0}:dmaap".format(kwargs["name"])
162         conn = dis.create_kv_conn(CONSUL_HOST)
163
164         def add_feed(dr_sub):
165             # delivery url and subscriber id will be fill by the dmaap plugin later
166             v = { "location": dr_sub["location"], "delivery_url": None,
167                     "username": dr_sub["username"], "password": dr_sub["password"],
168                     "subscriber_id": None }
169             return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None
170
171         try:
172             if all(map(add_feed, dr_subs)):
173                 return kwargs
174         except Exception as e:
175             raise NonRecoverableError(e)
176
177         # You should never get here
178         raise NonRecoverableError("Failure updating feed streams in Consul")
179     else:
180         return kwargs
181
182
183 @merge_inputs_for_create
184 @monkeypatch_loggers
185 @Policies.gather_policies_to_node()
186 @operation
187 def create_for_components_with_streams(**create_inputs):
188     """Create step for Docker containers that are components that use DMaaP
189
190     This interface is responsible for:
191
192     1. Generating service component name
193     2. Setup runtime properties for DMaaP plugin
194     3. Populating application config into Consul
195     4. Populating DMaaP config for data router subscribers in Consul
196     """
197     _done_for_create(
198             **_setup_for_discovery(
199                 **_setup_for_discovery_streams(
200                     **_parse_streams(
201                         **_generate_component_name(
202                             **create_inputs)))))
203
204
205 @merge_inputs_for_create
206 @monkeypatch_loggers
207 @operation
208 def create_for_platforms(**create_inputs):
209     """Create step for Docker containers that are platform components
210
211     This interface is responible for:
212
213     1. Populating config information into Consul
214     """
215     _done_for_create(
216             **_setup_for_discovery(
217                 **create_inputs))
218
219
220 def _lookup_service(service_component_name, consul_host=CONSUL_HOST,
221         with_port=False):
222     conn = dis.create_kv_conn(consul_host)
223     results = dis.lookup_service(conn, service_component_name)
224
225     if with_port:
226         # Just grab first
227         result = results[0]
228         return "{address}:{port}".format(address=result["ServiceAddress"],
229                 port=result["ServicePort"])
230     else:
231         return results[0]["ServiceAddress"]
232
233
234 def _verify_container(service_component_name, max_wait):
235     """Verify that the container is healthy
236
237     Args:
238     -----
239     max_wait (integer): limit to how may attempts to make which translates to
240         seconds because each sleep is one second. 0 means infinite.
241
242     Return:
243     -------
244     True if component is healthy else a DockerPluginDeploymentError exception
245     will be raised.
246     """
247     num_attempts = 1
248
249     while True:
250         if k8sclient.is_available(DCAE_NAMESPACE, service_component_name):
251             return True
252         else:
253             num_attempts += 1
254
255             if max_wait > 0 and max_wait < num_attempts:
256                 raise DockerPluginDeploymentError("Container never became healthy")
257
258             time.sleep(1)
259
260     return True
261
262 def _create_and_start_container(container_name, image, **kwargs):
263     '''
264     This will create a k8s Deployment and, if needed, a k8s Service or two.
265     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
266     We're not exposing k8s to the component developer and the blueprint author.
267     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
268     the details from the component developer and the blueprint author.)
269
270     kwargs may have:
271         - volumes:  array of volume objects, where a volume object is:
272             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
273         - ports: array of strings in the form "container_port:host_port"
274         - envs: map of name-value pairs ( {name0: value0, name1: value1...} )
275         - always_pull: boolean.  If true, sets image pull policy to "Always"
276           so that a fresh copy of the image is always pull.  Otherwise, sets
277           image pull policy to "IfNotPresent"
278         - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
279         - log_info: an object with info for setting up ELK logging, with the form:
280             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}"
281         - replicas: number of replicas to be launched initially
282         - readiness: object with information needed to create a readiness check
283     '''
284     env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
285             "CONFIG_BINDING_SERVICE": "config-binding-service" }
286     env.update(kwargs.get("envs", {}))
287     ctx.logger.info("Deploying {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
288     ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
289     replicas = kwargs.get("replicas", 1)
290     _,dep = k8sclient.deploy(DCAE_NAMESPACE,
291                      container_name,
292                      image,
293                      replicas = replicas,
294                      always_pull=kwargs.get("always_pull_image", False),
295                      k8sconfig=plugin_conf,
296                      volumes=kwargs.get("volumes",[]),
297                      ports=kwargs.get("ports",[]),
298                      msb_list=kwargs.get("msb_list"),
299                      env = env,
300                      labels = kwargs.get("labels", {}),
301                      log_info=kwargs.get("log_info"),
302                      readiness=kwargs.get("readiness"))
303
304     # Capture the result of deployment for future use
305     ctx.instance.runtime_properties["k8s_deployment"] = dep
306     ctx.instance.runtime_properties["replicas"] = replicas
307     ctx.logger.info ("Deployment complete: {0}".format(dep))
308
309 def _parse_cloudify_context(**kwargs):
310     """Parse Cloudify context
311
312     Extract what is needed. This is impure function because it requires ctx.
313     """
314     kwargs["deployment_id"] = ctx.deployment.id
315
316     # Set some labels for the Kubernetes pods
317     kwargs["labels"] = {
318         "cfydeployment" : ctx.deployment.id,
319         "cfynode": ctx.node.name,
320         "cfynodeinstance": ctx.instance.id
321     }
322
323         # Pick up the centralized logging info
324     if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
325         kwargs["log_info"] = ctx.node.properties["log_info"]
326
327     # Pick up replica count and always_pull_image flag
328     if "replicas" in ctx.node.properties:
329         kwargs["replicas"] = ctx.node.properties["replicas"]
330     if "always_pull_image" in ctx.node.properties:
331         kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
332
333     return kwargs
334
335 def _enhance_docker_params(**kwargs):
336     '''
337     Set up Docker environment variables and readiness check info
338     and inject into kwargs.
339     '''
340
341     # Get info for setting up readiness probe, if present
342     docker_config = kwargs.get("docker_config", {})
343     if "healthcheck" in docker_config:
344         kwargs["readiness"] = docker_config["healthcheck"]
345
346     envs = kwargs.get("envs", {})
347
348     # Set tags on this component for its Consul registration as a service
349     tags = [kwargs.get("deployment_id", None), kwargs["service_id"]]
350     tags = [ str(tag) for tag in tags if tag is not None ]
351     # Registrator will use this to register this component with tags. Must be
352     # comma delimited.
353     envs["SERVICE_TAGS"] = ",".join(tags)
354
355     kwargs["envs"] = envs
356
357     def combine_params(key, docker_config, kwargs):
358         v = docker_config.get(key, []) + kwargs.get(key, [])
359         if v:
360             kwargs[key] = v
361         return kwargs
362
363     # Add the lists of ports and volumes unintelligently - meaning just add the
364     # lists together with no deduping.
365     kwargs = combine_params("ports", docker_config, kwargs)
366     kwargs = combine_params("volumes", docker_config, kwargs)
367
368
369     return kwargs
370
371 def _create_and_start_component(**kwargs):
372     """Create and start component (container)"""
373     image = kwargs["image"]
374     service_component_name = kwargs[SERVICE_COMPONENT_NAME]
375     # Need to be picky and manually select out pieces because just using kwargs
376     # which contains everything confused the execution of
377     # _create_and_start_container because duplicate variables exist
378     sub_kwargs = {
379         "volumes": kwargs.get("volumes", []),
380         "ports": kwargs.get("ports", None),
381         "envs": kwargs.get("envs", {}),
382         "log_info": kwargs.get("log_info", {}),
383         "labels": kwargs.get("labels", {}),
384         "readiness": kwargs.get("readiness",{})}
385     _create_and_start_container(service_component_name, image, **sub_kwargs)
386
387     # TODO: Use regular logging here
388     ctx.logger.info("Container started: {0}".format(service_component_name))
389
390     return kwargs
391
392 def _verify_component(**kwargs):
393     """Verify component (container) is healthy"""
394     service_component_name = kwargs[SERVICE_COMPONENT_NAME]
395
396     max_wait = kwargs.get("max_wait", 300)
397
398     # Verify that the container is healthy
399
400     if _verify_container(service_component_name, max_wait):
401         service_component_name = kwargs[SERVICE_COMPONENT_NAME]
402
403         # TODO: Use regular logging here
404         ctx.logger.info("Container is healthy: {0}".format(service_component_name))
405
406     return kwargs
407
408 def _done_for_start(**kwargs):
409     ctx.instance.runtime_properties.update(kwargs)
410     ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
411     return kwargs
412
413 def _setup_msb_registration(service_name, msb_reg):
414     return {
415         "serviceName" : service_name,
416         "port" : msb_reg.get("port", "80"),
417         "version" : msb_reg.get("version", "v1"),
418         "url" : msb_reg.get("url_path", "/v1"),
419         "protocol" : "REST",
420         "enable_ssl" : msb_reg.get("uses_ssl", False),
421         "visualRange" : "1"
422 }
423
424 @wrap_error_handling_start
425 @merge_inputs_for_start
426 @monkeypatch_loggers
427 @operation
428 def create_and_start_container_for_components(**start_inputs):
429     """Create Docker container and start for components
430
431     This operation method is to be used with the DockerContainerForComponents
432     node type. After launching the container, the plugin will verify with Consul
433     that the app is up and healthy before terminating.
434     """
435     _done_for_start(
436             **_verify_component(
437                 **_create_and_start_component(
438                     **_enhance_docker_params(
439                         **_parse_cloudify_context(**start_inputs)))))
440
441
442 def _update_delivery_url(**kwargs):
443     """Update the delivery url for data router subscribers"""
444     dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
445             if s["type"] == "data_router"]
446
447     if dr_subs:
448         service_component_name = kwargs[SERVICE_COMPONENT_NAME]
449         # TODO: Should NOT be setting up the delivery url with ip addresses
450         # because in the https case, this will not work because data router does
451         # a certificate validation using the fqdn.
452         subscriber_host = _lookup_service(service_component_name, with_port=True)
453
454         for dr_sub in dr_subs:
455             scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME
456             if "route" not in dr_sub:
457                 raise NonRecoverableError("'route' key missing from data router subscriber")
458             path = dr_sub["route"]
459             dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format(
460                     scheme=scheme, host=subscriber_host, path=path)
461             kwargs[dr_sub["name"]] = dr_sub
462
463     return kwargs
464
465 @wrap_error_handling_start
466 @merge_inputs_for_start
467 @monkeypatch_loggers
468 @operation
469 def create_and_start_container_for_components_with_streams(**start_inputs):
470     """Create Docker container and start for components that have streams
471
472     This operation method is to be used with the DockerContainerForComponents
473     node type. After launching the container, the plugin will verify with Consul
474     that the app is up and healthy before terminating.
475     """
476     _done_for_start(
477             **_update_delivery_url(
478                 **_verify_component(
479                     **_create_and_start_component(
480                         **_enhance_docker_params(
481                             **_parse_cloudify_context(**start_inputs))))))
482
483
484 @wrap_error_handling_start
485 @monkeypatch_loggers
486 @operation
487 def create_and_start_container_for_platforms(**kwargs):
488     """Create Docker container and start for platform services
489
490     This operation method is to be used with the ContainerizedPlatformComponent
491     node type.
492     """
493     # Capture node properties
494     image = ctx.node.properties["image"]
495     docker_config = ctx.node.properties.get("docker_config", {})
496     if "healthcheck" in docker_config:
497         kwargs["readiness"] = docker_config["healthcheck"]
498     if "dns_name" in ctx.node.properties:
499         service_component_name = ctx.node.properties["dns_name"]
500     else:
501         service_component_name = ctx.node.properties["name"]
502
503     # Set some labels for the Kubernetes pods
504     kwargs["labels"] = {
505         "cfydeployment" : ctx.deployment.id,
506         "cfynode": ctx.node.name,
507         "cfynodeinstance": ctx.instance.id
508     }
509
510     host_port = ctx.node.properties["host_port"]
511     container_port = ctx.node.properties["container_port"]
512
513     # Cloudify properties are all required and Cloudify complains that None
514     # is not a valid type for integer. Defaulting to 0 to indicate to not
515     # use this and not to set a specific port mapping in cases like service
516     # change handler.
517     if container_port != 0:
518         # Doing this because other nodes might want to use this property
519         port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port)
520         ports = kwargs.get("ports", []) + [ port_mapping ]
521         kwargs["ports"] = ports
522     if "ports" not in kwargs:
523         ctx.logger.warn("No port mappings defined. Will randomly assign port.")
524
525     # All of the new node properties could be handled more DRYly!
526     # If a registration to MSB is required, then set up the registration info
527     if "msb_registration" in ctx.node.properties and "port" in ctx.node.properties["msb_registration"]:
528         kwargs["msb_list"] = [_setup_msb_registration(service_component_name, ctx.node.properties["msb_registration"])]
529
530     # If centralized logging via ELK is desired, then set up the logging info
531     if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
532         kwargs["log_info"] = ctx.node.properties["log_info"]
533
534     # Pick up replica count and always_pull_image flag
535     if "replicas" in ctx.node.properties:
536         kwargs["replicas"] = ctx.node.properties["replicas"]
537     if "always_pull_image" in ctx.node.properties:
538         kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
539     _create_and_start_container(service_component_name, image, **kwargs)
540
541     ctx.logger.info("Container started: {0}".format(service_component_name))
542
543     # Verify that the container is healthy
544
545     max_wait = kwargs.get("max_wait", 300)
546
547     if _verify_container(service_component_name, max_wait):
548         ctx.logger.info("Container is healthy: {0}".format(service_component_name))
549
550
551 @wrap_error_handling_start
552 @monkeypatch_loggers
553 @operation
554 def create_and_start_container(**kwargs):
555     """Create Docker container and start"""
556     service_component_name = ctx.node.properties["name"]
557     ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
558
559     image = ctx.node.properties["image"]
560
561     _create_and_start_container(service_component_name, image,**kwargs)
562
563     ctx.logger.info("Component deployed: {0}".format(service_component_name))
564
565
566 @monkeypatch_loggers
567 @operation
568 def stop_and_remove_container(**kwargs):
569     """Stop and remove Docker container"""
570     try:
571         deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
572         k8sclient.undeploy(deployment_description)
573
574     except Exception as e:
575         ctx.logger.error("Unexpected error while stopping container: {0}"
576                 .format(str(e)))
577
578 @wrap_error_handling_update
579 @monkeypatch_loggers
580 @operation
581 def scale(replicas, **kwargs):
582     """Change number of replicas in the deployment"""
583     if replicas > 0:
584         current_replicas = ctx.instance.runtime_properties["replicas"]
585         ctx.logger.info("Scaling from {0} to {1}".format(current_replicas, replicas))
586         deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
587         k8sclient.scale(deployment_description, replicas)
588         ctx.instance.runtime_properties["replicas"] = replicas
589
590         # Verify that the scaling took place as expected
591         max_wait = kwargs.get("max_wait", 300)
592         service_component_name = ctx.instance.runtime_properties["service_component_name"]
593         if _verify_container(service_component_name, max_wait):
594             ctx.logger.info("Scaling complete : {0} from {1} to {2} instance(s)".format(service_component_name, current_replicas, replicas))
595
596     else:
597         ctx.logger.info("Ignoring request to scale to zero replicas")
598
599 @wrap_error_handling_update
600 @monkeypatch_loggers
601 @operation
602 def update_image(image, **kwargs):
603     if image:
604         current_image = ctx.instance.runtime_properties["image"]
605         ctx.logger.info("Updating application container image from {0} to {1}".format(current_image, image))
606         deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
607         k8sclient.upgrade(deployment_description, image)
608         ctx.instance.runtime_properties["image"] = image
609
610         # Verify that the update took place as expected
611         max_wait = kwargs.get("max_wait", 300)
612         service_component_name = ctx.instance.runtime_properties["service_component_name"]
613         if _verify_container(service_component_name, max_wait):
614             ctx.logger.info("Update complete : {0} from {1} to {2} instance(s)".format(service_component_name, current_image, image))
615
616     else:
617         ctx.logger.info("Ignoring update_image request with unusable image '{0}'".format(str(image)))
618
619 #TODO: implement rollback operation when kubernetes python client fix is available.
620 # (See comments in k8sclient.py.)
621 # In the meantime, it's possible to undo an update_image operation by doing a second
622 # update_image that specifies the older image.
623
624 @monkeypatch_loggers
625 @Policies.cleanup_policies_on_node
626 @operation
627 def cleanup_discovery(**kwargs):
628     """Delete configuration from Consul"""
629     service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
630
631     try:
632         conn = dis.create_kv_conn(CONSUL_HOST)
633         dis.remove_service_component_config(conn, service_component_name)
634     except dis.DiscoveryConnectionError as e:
635         raise RecoverableError(e)
636
637
638 def _notify_container(**kwargs):
639     """Notify container using the policy section in the docker_config"""
640     dc = kwargs["docker_config"]
641
642     if "policy" in dc:
643         if dc["policy"]["trigger_type"] == "docker":
644             pass
645             """
646             Need replacement for this in kubernetes.
647             Need to find all the pods that have been deployed
648             and execute the script in them.
649             Kubernetes does not appear to have a way to ask for a script
650             to be executed in all of the currently running pods for a
651             Kubernetes Deployment or ReplicaSet.   We will have to find
652             each of them and run the script.   The problem is that set of
653             pods could be changing.   We can query to get all the pods, but
654             there's no guarantee the list won't change while we're trying to
655             execute the script.
656
657             In ONAP R2, all of the policy-driven components rely on polling.
658             """
659             """
660             # REVIEW: Need to finalize on the docker config policy data structure
661             script_path = dc["policy"]["script_path"]
662             updated_policies = kwargs["updated_policies"]
663             removed_policies = kwargs["removed_policies"]
664             policies = kwargs["policies"]
665             cmd = doc.build_policy_update_cmd(script_path, use_sh=False,
666                     msg_type="policies",
667                     updated_policies=updated_policies,
668                     removed_policies=removed_policies,
669                     policies=policies
670                     )
671
672             docker_host = kwargs[SELECTED_CONTAINER_DESTINATION]
673             docker_host_ip = _lookup_service(docker_host)
674             logins = _get_docker_logins()
675             client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins)
676
677             container_id = kwargs["container_id"]
678
679             doc.notify_for_policy_update(client, container_id, cmd)
680     """
681     # else the default is no trigger
682
683     return kwargs
684
685
686 @monkeypatch_loggers
687 @Policies.update_policies_on_node()
688 @operation
689 def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
690     """Policy update task
691
692     This method is responsible for updating the application configuration and
693     notifying the applications that the change has occurred. This is to be used
694     for the dcae.interfaces.policy.policy_update operation.
695
696     :updated_policies: contains the list of changed policy-configs when configs_only=True
697         (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
698     """
699     update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
700     update_inputs["updated_policies"] = updated_policies
701     update_inputs["removed_policies"] = removed_policies
702     update_inputs["policies"] = policies
703
704     _notify_container(**update_inputs)