ba71bd9e995bc36bfb3ae6e6c1922ac1012bffc3
[dcaegen2/platform/plugins.git] / k8s / k8splugin / tasks.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
18 #
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
20
21 # Lifecycle interface calls for containerized components
22
23 # Needed by Cloudify Manager to load google.auth for the Kubernetes python client
24 import cloudify_importer
25
26 import time, copy
27 import json
28 from cloudify import ctx
29 from cloudify.decorators import operation
30 from cloudify.exceptions import NonRecoverableError, RecoverableError
31 from onap_dcae_dcaepolicy_lib import Policies
32 from k8splugin import discovery as dis
33 from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
34     merge_inputs_for_start, merge_inputs_for_create, wrap_error_handling_update
35 from k8splugin.exceptions import DockerPluginDeploymentError
36 from k8splugin import utils
37 from configure import configure
38 import k8sclient
39
40 # Get configuration
41 plugin_conf = configure.configure()
42 CONSUL_HOST = plugin_conf.get("consul_host")
43 CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
44 DCAE_NAMESPACE = plugin_conf.get("namespace")
45
46 # Used to construct delivery urls for data router subscribers. Data router in FTL
47 # requires https but this author believes that ONAP is to be defaulted to http.
48 DEFAULT_SCHEME = "http"
49
50 # Property keys
51 SERVICE_COMPONENT_NAME = "service_component_name"
52 CONTAINER_ID = "container_id"
53 APPLICATION_CONFIG = "application_config"
54 K8S_DEPLOYMENT = "k8s_deployment"
55
56 # Utility methods
57
58 # Lifecycle interface calls for dcae.nodes.DockerContainer
59
60 def _setup_for_discovery(**kwargs):
61     """Setup for config discovery"""
62     try:
63         name = kwargs['name']
64         application_config = kwargs[APPLICATION_CONFIG]
65
66         # NOTE: application_config is no longer a json string and is inputed as a
67         # YAML map which translates to a dict. We don't have to do any
68         # preprocessing anymore.
69         conn = dis.create_kv_conn(CONSUL_HOST)
70         dis.push_service_component_config(conn, name, application_config)
71         return kwargs
72     except dis.DiscoveryConnectionError as e:
73         raise RecoverableError(e)
74     except Exception as e:
75         ctx.logger.error("Unexpected error while pushing configuration: {0}"
76                 .format(str(e)))
77         raise NonRecoverableError(e)
78
79 def _generate_component_name(**kwargs):
80     """Generate component name"""
81     service_component_type = kwargs['service_component_type']
82     name_override = kwargs['service_component_name_override']
83
84     kwargs['name'] = name_override if name_override \
85             else dis.generate_service_component_name(service_component_type)
86     return kwargs
87
88 def _done_for_create(**kwargs):
89     """Wrap up create operation"""
90     name = kwargs['name']
91     kwargs[SERVICE_COMPONENT_NAME] = name
92     # All updates to the runtime_properties happens here. I don't see a reason
93     # why we shouldn't do this because the context is not being mutated by
94     # something else and will keep the other functions pure (pure in the sense
95     # not dealing with CloudifyContext).
96     ctx.instance.runtime_properties.update(kwargs)
97     ctx.logger.info("Done setting up: {0}".format(name))
98     return kwargs
99
100
101 @merge_inputs_for_create
102 @monkeypatch_loggers
103 @Policies.gather_policies_to_node()
104 @operation
105 def create_for_components(**create_inputs):
106     """Create step for service components
107
108     This interface is responsible for:
109
110     1. Generating service component name
111     2. Populating config information into Consul
112     """
113     _done_for_create(
114             **_setup_for_discovery(
115                 **_generate_component_name(
116                     **create_inputs)))
117
118
119 def _parse_streams(**kwargs):
120     """Parse streams and setup for DMaaP plugin"""
121     # The DMaaP plugin requires this plugin to set the runtime properties
122     # keyed by the node name.
123     def setup_publishes(s):
124         kwargs[s["name"]] = s
125
126     map(setup_publishes, kwargs["streams_publishes"])
127
128     def setup_subscribes(s):
129         if s["type"] == "data_router":
130             # If username and password has been provided then generate it. The
131             # DMaaP plugin doesn't generate for subscribers. The generation code
132             # and length of username password has been lifted from the DMaaP
133             # plugin.
134
135             # Don't want to mutate the source
136             s = copy.deepcopy(s)
137             if not s.get("username", None):
138                 s["username"] = utils.random_string(8)
139             if not s.get("password", None):
140                 s["password"] = utils.random_string(10)
141
142         kwargs[s["name"]] = s
143
144     # NOTE: That the delivery url is constructed and setup in the start operation
145     map(setup_subscribes, kwargs["streams_subscribes"])
146
147     return kwargs
148
149 def _setup_for_discovery_streams(**kwargs):
150     """Setup for discovery of streams
151
152     Specifically, there's a race condition this call addresses for data router
153     subscriber case. The component needs its feed subscriber information but the
154     DMaaP plugin doesn't provide this until after the docker plugin start
155     operation.
156     """
157     dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
158             if s["type"] == "data_router"]
159
160     if dr_subs:
161         dmaap_kv_key = "{0}:dmaap".format(kwargs["name"])
162         conn = dis.create_kv_conn(CONSUL_HOST)
163
164         def add_feed(dr_sub):
165             # delivery url and subscriber id will be fill by the dmaap plugin later
166             v = { "location": dr_sub["location"], "delivery_url": None,
167                     "username": dr_sub["username"], "password": dr_sub["password"],
168                     "subscriber_id": None }
169             return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None
170
171         try:
172             if all(map(add_feed, dr_subs)):
173                 return kwargs
174         except Exception as e:
175             raise NonRecoverableError(e)
176
177         # You should never get here
178         raise NonRecoverableError("Failure updating feed streams in Consul")
179     else:
180         return kwargs
181
182
183 @merge_inputs_for_create
184 @monkeypatch_loggers
185 @Policies.gather_policies_to_node()
186 @operation
187 def create_for_components_with_streams(**create_inputs):
188     """Create step for service components that use DMaaP
189
190     This interface is responsible for:
191
192     1. Generating service component name
193     2. Setup runtime properties for DMaaP plugin
194     3. Populating application config into Consul
195     4. Populating DMaaP config for data router subscribers in Consul
196     """
197     _done_for_create(
198             **_setup_for_discovery(
199                 **_setup_for_discovery_streams(
200                     **_parse_streams(
201                         **_generate_component_name(
202                             **create_inputs)))))
203
204
205 @merge_inputs_for_create
206 @monkeypatch_loggers
207 @operation
208 def create_for_platforms(**create_inputs):
209     """Create step for platform components
210
211     This interface is responible for:
212
213     1. Populating config information into Consul
214     """
215     _done_for_create(
216             **_setup_for_discovery(
217                 **create_inputs))
218
219
220 def _lookup_service(service_component_name, consul_host=CONSUL_HOST,
221         with_port=False):
222     conn = dis.create_kv_conn(consul_host)
223     results = dis.lookup_service(conn, service_component_name)
224
225     if with_port:
226         # Just grab first
227         result = results[0]
228         return "{address}:{port}".format(address=result["ServiceAddress"],
229                 port=result["ServicePort"])
230     else:
231         return results[0]["ServiceAddress"]
232
233
234 def _verify_k8s_deployment(service_component_name, max_wait):
235     """Verify that the k8s Deployment is ready
236
237     Args:
238     -----
239     max_wait (integer): limit to how may attempts to make which translates to
240         seconds because each sleep is one second. 0 means infinite.
241
242     Return:
243     -------
244     True if deployment is ready else a DockerPluginDeploymentError exception
245     will be raised.
246     """
247     num_attempts = 1
248
249     while True:
250         if k8sclient.is_available(DCAE_NAMESPACE, service_component_name):
251             return True
252         else:
253             num_attempts += 1
254
255             if max_wait > 0 and max_wait < num_attempts:
256                 raise DockerPluginDeploymentError("k8s deployment never became ready for {0}".format(service_component_name))
257
258             time.sleep(1)
259
260     return True
261
262 def _create_and_start_container(container_name, image, **kwargs):
263     '''
264     This will create a k8s Deployment and, if needed, a k8s Service or two.
265     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
266     We're not exposing k8s to the component developer and the blueprint author.
267     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
268     the details from the component developer and the blueprint author.)
269
270     kwargs may have:
271         - volumes:  array of volume objects, where a volume object is:
272             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
273         - ports: array of strings in the form "container_port:host_port"
274         - envs: map of name-value pairs ( {name0: value0, name1: value1...} )
275         - always_pull: boolean.  If true, sets image pull policy to "Always"
276           so that a fresh copy of the image is always pull.  Otherwise, sets
277           image pull policy to "IfNotPresent"
278         - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
279         - log_info: an object with info for setting up ELK logging, with the form:
280             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}"
281         - replicas: number of replicas to be launched initially
282         - readiness: object with information needed to create a readiness check
283     '''
284     env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
285             "CONFIG_BINDING_SERVICE": "config-binding-service" }
286     env.update(kwargs.get("envs", {}))
287     ctx.logger.info("Starting k8s deployment for {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
288     ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
289     replicas = kwargs.get("replicas", 1)
290     _,dep = k8sclient.deploy(DCAE_NAMESPACE,
291                      container_name,
292                      image,
293                      replicas = replicas,
294                      always_pull=kwargs.get("always_pull_image", False),
295                      k8sconfig=plugin_conf,
296                      volumes=kwargs.get("volumes",[]),
297                      ports=kwargs.get("ports",[]),
298                      msb_list=kwargs.get("msb_list"),
299                      tls_info=kwargs.get("tls_info"),
300                      env = env,
301                      labels = kwargs.get("labels", {}),
302                      log_info=kwargs.get("log_info"),
303                      readiness=kwargs.get("readiness"))
304
305     # Capture the result of deployment for future use
306     ctx.instance.runtime_properties[K8S_DEPLOYMENT] = dep
307     ctx.instance.runtime_properties["replicas"] = replicas
308     ctx.logger.info ("k8s deployment initiated successfully for {0}: {1}".format(container_name, dep))
309
310 def _parse_cloudify_context(**kwargs):
311     """Parse Cloudify context
312
313     Extract what is needed. This is impure function because it requires ctx.
314     """
315     kwargs["deployment_id"] = ctx.deployment.id
316
317     # Set some labels for the Kubernetes pods
318     kwargs["labels"] = {
319         "cfydeployment" : ctx.deployment.id,
320         "cfynode": ctx.node.name,
321         "cfynodeinstance": ctx.instance.id
322     }
323
324         # Pick up the centralized logging info
325     if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
326         kwargs["log_info"] = ctx.node.properties["log_info"]
327
328     # Pick up TLS info if present
329     if "tls_info" in ctx.node.properties:
330         kwargs["tls_info"] = ctx.node.properties["tls_info"]
331
332     # Pick up replica count and always_pull_image flag
333     if "replicas" in ctx.node.properties:
334         kwargs["replicas"] = ctx.node.properties["replicas"]
335     if "always_pull_image" in ctx.node.properties:
336         kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
337
338     return kwargs
339
340 def _enhance_docker_params(**kwargs):
341     '''
342     Set up Docker environment variables and readiness check info
343     and inject into kwargs.
344     '''
345
346     # Get info for setting up readiness probe, if present
347     docker_config = kwargs.get("docker_config", {})
348     if "healthcheck" in docker_config:
349         kwargs["readiness"] = docker_config["healthcheck"]
350
351     envs = kwargs.get("envs", {})
352
353     # Set tags on this component for its Consul registration as a service
354     tags = [kwargs.get("deployment_id", None), kwargs["service_id"]]
355     tags = [ str(tag) for tag in tags if tag is not None ]
356     # Registrator will use this to register this component with tags. Must be
357     # comma delimited.
358     envs["SERVICE_TAGS"] = ",".join(tags)
359
360     kwargs["envs"] = envs
361
362     def combine_params(key, docker_config, kwargs):
363         v = docker_config.get(key, []) + kwargs.get(key, [])
364         if v:
365             kwargs[key] = v
366         return kwargs
367
368     # Add the lists of ports and volumes unintelligently - meaning just add the
369     # lists together with no deduping.
370     kwargs = combine_params("ports", docker_config, kwargs)
371     kwargs = combine_params("volumes", docker_config, kwargs)
372
373
374     return kwargs
375
376 def _create_and_start_component(**kwargs):
377     """Create and start component (container)"""
378     image = kwargs["image"]
379     service_component_name = kwargs[SERVICE_COMPONENT_NAME]
380     # Need to be picky and manually select out pieces because just using kwargs
381     # which contains everything confused the execution of
382     # _create_and_start_container because duplicate variables exist
383     sub_kwargs = {
384         "volumes": kwargs.get("volumes", []),
385         "ports": kwargs.get("ports", None),
386         "envs": kwargs.get("envs", {}),
387         "log_info": kwargs.get("log_info", {}),
388         "tls_info": kwargs.get("tls_info", {}),
389         "labels": kwargs.get("labels", {}),
390         "readiness": kwargs.get("readiness",{})}
391     _create_and_start_container(service_component_name, image, **sub_kwargs)
392
393     return kwargs
394
395 def _verify_component(**kwargs):
396     """Verify deployment is ready"""
397     service_component_name = kwargs[SERVICE_COMPONENT_NAME]
398
399     max_wait = kwargs.get("max_wait", 300)
400     ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
401
402     if _verify_k8s_deployment(service_component_name, max_wait):
403         ctx.logger.info("k8s deployment is ready for: {0}".format(service_component_name))
404
405     return kwargs
406
407 def _done_for_start(**kwargs):
408     ctx.instance.runtime_properties.update(kwargs)
409     ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
410     return kwargs
411
412 def _setup_msb_registration(service_name, msb_reg):
413     return {
414         "serviceName" : service_name,
415         "port" : msb_reg.get("port", "80"),
416         "version" : msb_reg.get("version", "v1"),
417         "url" : msb_reg.get("url_path", "/v1"),
418         "protocol" : "REST",
419         "enable_ssl" : msb_reg.get("uses_ssl", False),
420         "visualRange" : "1"
421 }
422
423 @wrap_error_handling_start
424 @merge_inputs_for_start
425 @monkeypatch_loggers
426 @operation
427 def create_and_start_container_for_components(**start_inputs):
428     """Initiate Kubernetes deployment for service components
429
430     This operation method is to be used with the ContainerizedServiceComponent
431     node type. After initiating a Kubernetes deployment, the plugin will verify with Kubernetes
432     that the app is up and responding successfully to readiness probes.
433     """
434     _done_for_start(
435             **_verify_component(
436                 **_create_and_start_component(
437                     **_enhance_docker_params(
438                         **_parse_cloudify_context(**start_inputs)))))
439
440
441 def _update_delivery_url(**kwargs):
442     """Update the delivery url for data router subscribers"""
443     dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
444             if s["type"] == "data_router"]
445
446     if dr_subs:
447         service_component_name = kwargs[SERVICE_COMPONENT_NAME]
448         # TODO: Should NOT be setting up the delivery url with ip addresses
449         # because in the https case, this will not work because data router does
450         # a certificate validation using the fqdn.
451         subscriber_host = _lookup_service(service_component_name, with_port=True)
452
453         for dr_sub in dr_subs:
454             scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME
455             if "route" not in dr_sub:
456                 raise NonRecoverableError("'route' key missing from data router subscriber")
457             path = dr_sub["route"]
458             dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format(
459                     scheme=scheme, host=subscriber_host, path=path)
460             kwargs[dr_sub["name"]] = dr_sub
461
462     return kwargs
463
464 @wrap_error_handling_start
465 @merge_inputs_for_start
466 @monkeypatch_loggers
467 @operation
468 def create_and_start_container_for_components_with_streams(**start_inputs):
469     """Initiate Kubernetes deployment for service components that have streams
470
471     This operation method is to be used with the ContainerizedServiceComponentUsingDmaap
472     node type. After initiating the Kubernetes deployment, the plugin will verify with
473     Kubernetes that the app is up and responding successfully to readiness probes.
474     """
475     _done_for_start(
476             **_update_delivery_url(
477                 **_verify_component(
478                     **_create_and_start_component(
479                         **_enhance_docker_params(
480                             **_parse_cloudify_context(**start_inputs))))))
481
482
483 @wrap_error_handling_start
484 @monkeypatch_loggers
485 @operation
486 def create_and_start_container_for_platforms(**kwargs):
487     """Initiate Kubernetes deployment for platform components
488
489     This operation method is to be used with the ContainerizedPlatformComponent
490     node type.
491     """
492     # Capture node properties
493     image = ctx.node.properties["image"]
494     docker_config = ctx.node.properties.get("docker_config", {})
495     if "healthcheck" in docker_config:
496         kwargs["readiness"] = docker_config["healthcheck"]
497     if "dns_name" in ctx.node.properties:
498         service_component_name = ctx.node.properties["dns_name"]
499     else:
500         service_component_name = ctx.node.properties["name"]
501
502     # Set some labels for the Kubernetes pods
503     kwargs["labels"] = {
504         "cfydeployment" : ctx.deployment.id,
505         "cfynode": ctx.node.name,
506         "cfynodeinstance": ctx.instance.id
507     }
508
509     host_port = ctx.node.properties["host_port"]
510     container_port = ctx.node.properties["container_port"]
511
512     # Cloudify properties are all required and Cloudify complains that None
513     # is not a valid type for integer. Defaulting to 0 to indicate to not
514     # use this and not to set a specific port mapping in cases like service
515     # change handler.
516     if container_port != 0:
517         # Doing this because other nodes might want to use this property
518         port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port)
519         ports = kwargs.get("ports", []) + [ port_mapping ]
520         kwargs["ports"] = ports
521     if "ports" not in kwargs:
522         ctx.logger.warn("No port mappings defined. Will randomly assign port.")
523
524     # All of the new node properties could be handled more DRYly!
525     # If a registration to MSB is required, then set up the registration info
526     if "msb_registration" in ctx.node.properties and "port" in ctx.node.properties["msb_registration"]:
527         kwargs["msb_list"] = [_setup_msb_registration(service_component_name, ctx.node.properties["msb_registration"])]
528
529     # If centralized logging via ELK is desired, then set up the logging info
530     if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
531         kwargs["log_info"] = ctx.node.properties["log_info"]
532
533     # Pick up TLS info if present
534     if "tls_info" in ctx.node.properties:
535         kwargs["tls_info"] = ctx.node.properties["tls_info"]
536
537     # Pick up replica count and always_pull_image flag
538     if "replicas" in ctx.node.properties:
539         kwargs["replicas"] = ctx.node.properties["replicas"]
540     if "always_pull_image" in ctx.node.properties:
541         kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
542     _create_and_start_container(service_component_name, image, **kwargs)
543
544     # Verify that the k8s deployment is ready
545
546     max_wait = kwargs.get("max_wait", 300)
547     ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
548
549     if _verify_k8s_deployment(service_component_name, max_wait):
550         ctx.logger.info("k8s deployment ready for: {0}".format(service_component_name))
551
552
553 @wrap_error_handling_start
554 @monkeypatch_loggers
555 @operation
556 def create_and_start_container(**kwargs):
557     """Initiate a Kubernetes deployment for the generic ContainerizedApplication node type"""
558     service_component_name = ctx.node.properties["name"]
559     ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
560
561     image = ctx.node.properties["image"]
562
563     _create_and_start_container(service_component_name, image,**kwargs)
564
565 @monkeypatch_loggers
566 @operation
567 def stop_and_remove_container(**kwargs):
568     """Delete Kubernetes deployment"""
569     if K8S_DEPLOYMENT in ctx.instance.runtime_properties:
570         try:
571             deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
572             k8sclient.undeploy(deployment_description)
573
574         except Exception as e:
575             ctx.logger.error("Unexpected error while deleting k8s deployment: {0}"
576                     .format(str(e)))
577     else:
578         # A previous install workflow may have failed,
579         # and no Kubernetes deployment info was recorded in runtime_properties.
580         # No need to run the undeploy operation
581         ctx.logger.info("No k8s deployment information, not attempting to delete k8s deployment")
582
583 @wrap_error_handling_update
584 @monkeypatch_loggers
585 @operation
586 def scale(replicas, **kwargs):
587     """Change number of replicas in the deployment"""
588     service_component_name = ctx.instance.runtime_properties["service_component_name"]
589
590     if replicas > 0:
591         current_replicas = ctx.instance.runtime_properties["replicas"]
592         ctx.logger.info("Scaling {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
593         deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
594         k8sclient.scale(deployment_description, replicas)
595         ctx.instance.runtime_properties["replicas"] = replicas
596
597         # Verify that the scaling took place as expected
598         max_wait = kwargs.get("max_wait", 300)
599         ctx.logger.info("Waiting up to {0} secs for {1} to scale and become ready".format(max_wait, service_component_name))
600         if _verify_k8s_deployment(service_component_name, max_wait):
601             ctx.logger.info("Scaling complete: {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
602
603     else:
604         ctx.logger.info("Ignoring request to scale {0} to zero replicas".format(service_component_name))
605
606 @wrap_error_handling_update
607 @monkeypatch_loggers
608 @operation
609 def update_image(image, **kwargs):
610     """ Restart component with a new Docker image """
611
612     service_component_name = ctx.instance.runtime_properties["service_component_name"]
613     if image:
614         current_image = ctx.instance.runtime_properties["image"]
615         ctx.logger.info("Updating app image for {0} from {1} to {2}".format(service_component_name, current_image, image))
616         deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
617         k8sclient.upgrade(deployment_description, image)
618         ctx.instance.runtime_properties["image"] = image
619
620         # Verify that the update took place as expected
621         max_wait = kwargs.get("max_wait", 300)
622         ctx.logger.info("Waiting up to {0} secs for {1} to be updated and become ready".format(max_wait, service_component_name))
623         if _verify_k8s_deployment(service_component_name, max_wait):
624             ctx.logger.info("Update complete: {0} from {1} to {2}".format(service_component_name, current_image, image))
625
626     else:
627         ctx.logger.info("Ignoring update_image request for {0} with unusable image '{1}'".format(service_component_name, str(image)))
628
629 #TODO: implement rollback operation when kubernetes python client fix is available.
630 # (See comments in k8sclient.py.)
631 # In the meantime, it's possible to undo an update_image operation by doing a second
632 # update_image that specifies the older image.
633
634 @monkeypatch_loggers
635 @Policies.cleanup_policies_on_node
636 @operation
637 def cleanup_discovery(**kwargs):
638     """Delete configuration from Consul"""
639     if SERVICE_COMPONENT_NAME in ctx.instance.runtime_properties:
640         service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
641
642         try:
643             conn = dis.create_kv_conn(CONSUL_HOST)
644             dis.remove_service_component_config(conn, service_component_name)
645         except dis.DiscoveryConnectionError as e:
646             raise RecoverableError(e)
647     else:
648         # When another node in the blueprint fails install,
649         # this node may not have generated a service component name.
650         # There's nothing to delete from Consul.
651         ctx.logger.info ("No service_component_name, not attempting to delete config from Consul")
652
653 def _notify_container(**kwargs):
654     """
655     Notify container using the policy section in the docker_config.
656     Notification consists of running a script in the application container
657     in each pod in the Kubernetes deployment associated with this node.
658     Return the list of notification results.
659     """
660     dc = kwargs["docker_config"]
661     resp = []
662
663     if "policy" in dc:
664         if dc["policy"]["trigger_type"] == "docker":
665
666              # Build the command to execute in the container
667              # SCRIPT_PATH policies {"policies" : ...., "updated_policies" : ..., "removed_policies": ...}
668             script_path = dc["policy"]["script_path"]
669             policy_data = {
670                 "policies": kwargs["policies"],
671                 "updated_policies": kwargs["updated_policies"],
672                 "removed_policies": kwargs["removed_policies"]
673             }
674
675             command = [script_path, "policies", json.dumps(policy_data)]
676
677             # Execute the command
678             deployment_description = ctx.instance.runtime_properties[K8S_DEPLOYMENT]
679             resp = k8sclient.execute_command_in_deployment(deployment_description, command)
680
681     # else the default is no trigger
682
683     return resp
684
685 @operation
686 @monkeypatch_loggers
687 @Policies.update_policies_on_node()
688 def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
689     """Policy update task
690
691     This method is responsible for updating the application configuration and
692     notifying the applications that the change has occurred. This is to be used
693     for the dcae.interfaces.policy.policy_update operation.
694
695     :updated_policies: contains the list of changed policy-configs when configs_only=True
696         (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
697     """
698     service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
699     ctx.logger.info("policy_update for {0}-- updated_policies: {1}, removed_policies: {2}, policies: {3}"
700         .format(service_component_name, updated_policies, removed_policies, policies))
701     update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
702     update_inputs["updated_policies"] = updated_policies
703     update_inputs["removed_policies"] = removed_policies
704     update_inputs["policies"] = policies
705
706     resp = _notify_container(**update_inputs)
707     ctx.logger.info("policy_update complete for {0}--notification results: {1}".format(service_component_name,json.dumps(resp)))